[ 
https://issues.apache.org/jira/browse/GOBBLIN-2004?focusedWorklogId=908975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908975
 ]

ASF GitHub Bot logged work on GOBBLIN-2004:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Mar/24 18:44
            Start Date: 08/Mar/24 18:44
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3891:
URL: https://github.com/apache/gobblin/pull/3891#discussion_r1518047237


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {

Review Comment:
   shouldn't the name reflect the token renewal that appears to be the crux of 
this class?
   
   e.g. `AbstractTokenAutoRenewingAppSecurityManager`?



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java:
##########
@@ -17,167 +17,50 @@
 
 package org.apache.gobblin.yarn;
 
-import java.io.IOException;
+import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.Credentials;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.model.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-
-import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+import org.apache.gobblin.yarn.helix.HelixMessageSubTypes;
 
 /**
  * <p>
- *   The super class for key management
- *   This class uses a scheduled task to do re-login to re-fetch token on a
- *   configurable schedule. It also uses a second scheduled task
- *   to renew the delegation token after each login. Both the re-login 
interval and the token
- *   renewing interval are configurable.
+ *   Inherits the {@link AbstractAppSecurityManager} and implements Helix
+ *   specific mechanisms for refreshing security credentials.
+ *
+ *   NOTE: Although this class contains the term "Yarn", the class does not 
specifically
+ *   rely on Yarn. It is a generic class that is used by Gobblin Yarn 
applications which
+ *   are typically managed by Helix

Review Comment:
   desc suggest helix to be an integral part, but given you just made it 
optional, let's rework javadoc to clarify/reconcile



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.

Review Comment:
   any advice relationship between the two?  e.g. should re-login interval be 
greater than renewal interval.
   
   nit: renewing interval => renewal interval



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li

Review Comment:
   not any more... even so, I don't believe attribution is in our coding 
standard



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
+      }
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);

Review Comment:
   I may be missing something, but why do we first delay by `loginInterval`?  
don't we want/need to login immediately, hence:
   ```
   scheduleAtFixedRate(theRunnable, 0, this.loginIntervalInMinutes, TU.MINS);
   ```
   ?



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
+      }
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+    if (this.scheduledTokenRenewTask.isPresent()) {

Review Comment:
   since this reads what's set in `scheduleTokenRenewalTask` and that is 
clearly multi-threaded, let's synchronize both method and this one (to 
guarantee `shutdown()` is not called concurrently)



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+  private final Config config;
+  private final String helixInstanceName;
+
+  @Getter
+  private final AtomicBoolean isApplicationRunningFlag;
+
+  private final boolean isHelixClusterManaged;

Review Comment:
   I can tell you're stuck w/ naming due to prior config keys, so maybe just 
add javadoc to describe what it means in plain language.
   e.g. to "useSharedHelixCluster" or (!)"shouldCreateHelixCluster"



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+  private final Config config;
+  private final String helixInstanceName;
+
+  @Getter
+  private final AtomicBoolean isApplicationRunningFlag;
+
+  private final boolean isHelixClusterManaged;
+  @Getter
+  private final HelixManager helixManager;
+  private final GobblinHelixMessagingService messagingService;
+
+  public HelixClusterLifecycleManager(Config config) throws IOException {
+    this.config = config;
+    this.isApplicationRunningFlag = new AtomicBoolean(false);
+
+    String zkConnectionString = 
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    log.info("Using ZooKeeper connection string: " + zkConnectionString);
+
+    this.isHelixClusterManaged = ConfigUtils.getBoolean(this.config, 
GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+        GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+
+    this.helixManager = HelixManagerFactory.getZKHelixManager(
+        
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), 
GobblinClusterUtils.getHostname(),
+        InstanceType.SPECTATOR, zkConnectionString);
+    this.messagingService = new 
GobblinHelixMessagingService(this.helixManager);
+    this.helixInstanceName = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        GobblinClusterManager.class.getSimpleName());
+
+    log.info("Starting Helix cluster");
+    connectHelixManager();
+    createHelixCluster();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (this.isApplicationRunningFlag.get()) {
+      this.sendShutdownRequest();
+    }
+    this.disconnectHelixManager();
+  }
+
+  void createHelixCluster() {
+    if (this.isHelixClusterManaged) {
+      log.info("Helix cluster is managed; skipping creation of Helix cluster");
+    } else {
+      String clusterName = 
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+      boolean overwriteExistingCluster = ConfigUtils.getBoolean(this.config, 
GobblinClusterConfigurationKeys.HELIX_CLUSTER_OVERWRITE_KEY,
+          GobblinClusterConfigurationKeys.DEFAULT_HELIX_CLUSTER_OVERWRITE);
+      log.info("Creating Helix cluster {} with overwrite: {}", clusterName, 
overwriteExistingCluster);
+      
HelixUtils.createGobblinHelixCluster(this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY),
+          clusterName, overwriteExistingCluster);
+      log.info("Created Helix cluster " + clusterName);
+    }
+  }
+
+  void connectHelixManager() {

Review Comment:
   why so much package `protected`?  I'd expect either `protected` or `private`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -898,49 +858,31 @@ private Path getHdfsLogDir(Path appWorkDir) throws 
IOException {
     return logRootDir;
   }
 
-  private AbstractYarnAppSecurityManager buildSecurityManager() throws 
IOException {
+  /**
+   *
+   * @return
+   * @throws IOException
+   */
+  private AbstractAppSecurityManager buildSecurityManager() throws IOException 
{
     Path tokenFilePath = new Path(this.fs.getHomeDirectory(), 
this.applicationName + Path.SEPARATOR +
         GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+    String securityManagerClassName = ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, 
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);
 
-    ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new 
ClassAliasResolver<>(
-        AbstractYarnAppSecurityManager.class);
     try {
-     return (AbstractYarnAppSecurityManager) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
-          ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, 
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, 
this.helixManager, this.fs,
-          tokenFilePath);
+      if (helixClusterLifecycleManager.isPresent()) {
+        HelixManager helixManager = 
helixClusterLifecycleManager.get().getHelixManager();
+        ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new 
ClassAliasResolver<>(AbstractYarnAppSecurityManager.class);
+        return (AbstractYarnAppSecurityManager) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(securityManagerClassName)),
 this.config, helixManager, this.fs,
+            tokenFilePath);
+      }
+
+      ClassAliasResolver aliasResolver = new 
ClassAliasResolver<>(AbstractAppSecurityManager.class);
+      return (AbstractAppSecurityManager) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(securityManagerClassName)),
 this.config, this.fs, tokenFilePath);

Review Comment:
   somewhat confusing to have alt. `ClassAliasResolver`s, one typed and the 
other `CAR<?>`.
   
   couldn't we just do one and take advantage of the ability to pass `null` to 
the second param of the ctor by doing:
   ```
   helixClusterLifecycleManager.map(HCLM::getHelixManager).orElse(null)
   ```



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){

Review Comment:
   whitespace



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));

Review Comment:
   NBD, but I'd combine - `"Starting {} with login task interval of {} mins"`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
+      }
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+    if (this.scheduledTokenRenewTask.isPresent()) {
+      this.scheduledTokenRenewTask.get().cancel(true);
+    }
+    ExecutorsUtils.shutdownExecutorService(this.loginExecutor, 
Optional.of(LOGGER));
+    ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor, 
Optional.of(LOGGER));
+  }
+
+  protected void scheduleTokenRenewTask() {
+    LOGGER.info(String.format("Scheduling the token renew task with an 
interval of %d minute(s)",
+        this.tokenRenewIntervalInMinutes));
+
+    this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+        this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              renewDelegationToken();
+            } catch (IOException ioe) {
+              LOGGER.error("Failed to renew delegation token", ioe);
+              throw Throwables.propagate(ioe);
+            } catch (InterruptedException ie) {
+              LOGGER.error("Token renew task has been interrupted");
+              Thread.currentThread().interrupt();
+            }
+          }
+        }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes, 
TimeUnit.MINUTES));
+  }
+
+  //The whole logic for each re-login
+  public void loginAndScheduleTokenRenewal() {
+    try {
+      // Cancel the currently scheduled token renew task
+      if (scheduledTokenRenewTask.isPresent() && 
scheduledTokenRenewTask.get().cancel(true)) {
+        LOGGER.info("Cancelled the token renew task");
+      }
+
+      login();
+      if (firstLogin) {
+        firstLogin = false;
+      }
+
+      // Re-schedule the token renew task after re-login
+      scheduleTokenRenewTask();
+    } catch (IOException | InterruptedException ioe) {
+      LOGGER.error("Failed to login from keytab", ioe);
+      throw Throwables.propagate(ioe);

Review Comment:
   let's directly wrap w/ `RuntimeException`; see - 
https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {

Review Comment:
   understandable, since you didn't have time to fully refine, but javadoc 
please before checkin



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+  private final Config config;
+  private final String helixInstanceName;
+
+  @Getter
+  private final AtomicBoolean isApplicationRunningFlag;

Review Comment:
   do you really want to provide callers the `AtomicBoolean` so they can adjust 
it, or instead just given them:
   a.) a read-only `boolean` of the current value?
   OR
   b.) a `setIsNowRunning()` method
   ?



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
+      }
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+    if (this.scheduledTokenRenewTask.isPresent()) {
+      this.scheduledTokenRenewTask.get().cancel(true);
+    }
+    ExecutorsUtils.shutdownExecutorService(this.loginExecutor, 
Optional.of(LOGGER));
+    ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor, 
Optional.of(LOGGER));
+  }
+
+  protected void scheduleTokenRenewTask() {
+    LOGGER.info(String.format("Scheduling the token renew task with an 
interval of %d minute(s)",
+        this.tokenRenewIntervalInMinutes));
+
+    this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+        this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              renewDelegationToken();
+            } catch (IOException ioe) {
+              LOGGER.error("Failed to renew delegation token", ioe);
+              throw Throwables.propagate(ioe);
+            } catch (InterruptedException ie) {
+              LOGGER.error("Token renew task has been interrupted");
+              Thread.currentThread().interrupt();
+            }
+          }
+        }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes, 
TimeUnit.MINUTES));
+  }
+
+  //The whole logic for each re-login
+  public void loginAndScheduleTokenRenewal() {
+    try {
+      // Cancel the currently scheduled token renew task
+      if (scheduledTokenRenewTask.isPresent() && 
scheduledTokenRenewTask.get().cancel(true)) {

Review Comment:
   method should be `synchronized` along w/ `scheduleTokenRenewalTask`



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ *   The super class for key management
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
+ *   configurable schedule. It also uses a second scheduled task
+ *   to renew the delegation token after each login. Both the re-login 
interval and the token
+ *   renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+  protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+  protected Config config;
+
+  protected final FileSystem fs;
+  protected final Path tokenFilePath;
+
+  protected Credentials credentials = new Credentials();
+  private final long loginIntervalInMinutes;
+  private final long tokenRenewIntervalInMinutes;
+  private final ScheduledExecutorService loginExecutor;
+  private final ScheduledExecutorService tokenRenewExecutor;
+
+  private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = 
Optional.absent();
+
+  // This flag is used to tell if this is the first login. If yes, no token 
updated message will be
+  // sent to the controller and the participants as they may not be up running 
yet. The first login
+  // happens after this class starts up so the token gets regularly refreshed 
before the next login.
+  protected volatile boolean firstLogin = true;
+
+  public AbstractAppSecurityManager(Config config, FileSystem fs, Path 
tokenFilePath) {
+    this.config = config;
+    this.fs = fs;
+    this.tokenFilePath = tokenFilePath;
+    this.fs.makeQualified(tokenFilePath);
+    this.loginIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+    this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config, 
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+        GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+    this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("KeytabReLoginExecutor")));
+    this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("TokenRenewExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+    LOGGER.info(
+        String.format("Scheduling the login task with an interval of %d 
minute(s)", this.loginIntervalInMinutes));
+
+    // Schedule the Kerberos re-login task
+    this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          loginAndScheduleTokenRenewal();
+        }catch(Exception e){
+          LOGGER.error("Error during login, will continue the thread and try 
next time.");
+        }
+      }
+    }, this.loginIntervalInMinutes, this.loginIntervalInMinutes, 
TimeUnit.MINUTES);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+    if (this.scheduledTokenRenewTask.isPresent()) {
+      this.scheduledTokenRenewTask.get().cancel(true);
+    }
+    ExecutorsUtils.shutdownExecutorService(this.loginExecutor, 
Optional.of(LOGGER));
+    ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor, 
Optional.of(LOGGER));
+  }
+
+  protected void scheduleTokenRenewTask() {
+    LOGGER.info(String.format("Scheduling the token renew task with an 
interval of %d minute(s)",
+        this.tokenRenewIntervalInMinutes));
+
+    this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+        this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              renewDelegationToken();
+            } catch (IOException ioe) {
+              LOGGER.error("Failed to renew delegation token", ioe);
+              throw Throwables.propagate(ioe);
+            } catch (InterruptedException ie) {
+              LOGGER.error("Token renew task has been interrupted");
+              Thread.currentThread().interrupt();
+            }
+          }
+        }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes, 
TimeUnit.MINUTES));
+  }
+
+  //The whole logic for each re-login
+  public void loginAndScheduleTokenRenewal() {
+    try {
+      // Cancel the currently scheduled token renew task
+      if (scheduledTokenRenewTask.isPresent() && 
scheduledTokenRenewTask.get().cancel(true)) {
+        LOGGER.info("Cancelled the token renew task");
+      }
+
+      login();
+      if (firstLogin) {
+        firstLogin = false;
+      }
+
+      // Re-schedule the token renew task after re-login
+      scheduleTokenRenewTask();
+    } catch (IOException | InterruptedException ioe) {
+      LOGGER.error("Failed to login from keytab", ioe);
+      throw Throwables.propagate(ioe);
+    }
+  }
+
+  /**
+   * Write the current credentials to the token file.
+   */
+  protected synchronized void writeDelegationTokenToFile(Credentials cred) 
throws IOException {
+
+    if (this.fs.exists(this.tokenFilePath)) {
+      LOGGER.info("Deleting existing token file " + this.tokenFilePath);
+      this.fs.delete(this.tokenFilePath, false);
+    }
+
+    LOGGER.debug("creating new token file {} with 644 permission.", 
this.tokenFilePath);

Review Comment:
   looks more like `setPermission` to `0600`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 908975)
    Time Spent: 0.5h  (was: 20m)

> Make a Gobblin yarn app launcher that does not depend on Helix
> --------------------------------------------------------------
>
>                 Key: GOBBLIN-2004
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2004
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to