[
https://issues.apache.org/jira/browse/GOBBLIN-2004?focusedWorklogId=908975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908975
]
ASF GitHub Bot logged work on GOBBLIN-2004:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Mar/24 18:44
Start Date: 08/Mar/24 18:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3891:
URL: https://github.com/apache/gobblin/pull/3891#discussion_r1518047237
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
Review Comment:
shouldn't the name reflect the token renewal that appears to be the crux of
this class?
e.g. `AbstractTokenAutoRenewingAppSecurityManager`?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java:
##########
@@ -17,167 +17,50 @@
package org.apache.gobblin.yarn;
-import java.io.IOException;
+import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.Credentials;
import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-
-import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+import org.apache.gobblin.yarn.helix.HelixMessageSubTypes;
/**
* <p>
- * The super class for key management
- * This class uses a scheduled task to do re-login to re-fetch token on a
- * configurable schedule. It also uses a second scheduled task
- * to renew the delegation token after each login. Both the re-login
interval and the token
- * renewing interval are configurable.
+ * Inherits the {@link AbstractAppSecurityManager} and implements Helix
+ * specific mechanisms for refreshing security credentials.
+ *
+ * NOTE: Although this class contains the term "Yarn", the class does not
specifically
+ * rely on Yarn. It is a generic class that is used by Gobblin Yarn
applications which
+ * are typically managed by Helix
Review Comment:
desc suggest helix to be an integral part, but given you just made it
optional, let's rework javadoc to clarify/reconcile
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
Review Comment:
any advice relationship between the two? e.g. should re-login interval be
greater than renewal interval.
nit: renewing interval => renewal interval
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
Review Comment:
not any more... even so, I don't believe attribution is in our coding
standard
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
+ }
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
Review Comment:
I may be missing something, but why do we first delay by `loginInterval`?
don't we want/need to login immediately, hence:
```
scheduleAtFixedRate(theRunnable, 0, this.loginIntervalInMinutes, TU.MINS);
```
?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
+ }
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+ if (this.scheduledTokenRenewTask.isPresent()) {
Review Comment:
since this reads what's set in `scheduleTokenRenewalTask` and that is
clearly multi-threaded, let's synchronize both method and this one (to
guarantee `shutdown()` is not called concurrently)
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+ private final Config config;
+ private final String helixInstanceName;
+
+ @Getter
+ private final AtomicBoolean isApplicationRunningFlag;
+
+ private final boolean isHelixClusterManaged;
Review Comment:
I can tell you're stuck w/ naming due to prior config keys, so maybe just
add javadoc to describe what it means in plain language.
e.g. to "useSharedHelixCluster" or (!)"shouldCreateHelixCluster"
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+ private final Config config;
+ private final String helixInstanceName;
+
+ @Getter
+ private final AtomicBoolean isApplicationRunningFlag;
+
+ private final boolean isHelixClusterManaged;
+ @Getter
+ private final HelixManager helixManager;
+ private final GobblinHelixMessagingService messagingService;
+
+ public HelixClusterLifecycleManager(Config config) throws IOException {
+ this.config = config;
+ this.isApplicationRunningFlag = new AtomicBoolean(false);
+
+ String zkConnectionString =
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ log.info("Using ZooKeeper connection string: " + zkConnectionString);
+
+ this.isHelixClusterManaged = ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+ GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+
+ this.helixManager = HelixManagerFactory.getZKHelixManager(
+
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
GobblinClusterUtils.getHostname(),
+ InstanceType.SPECTATOR, zkConnectionString);
+ this.messagingService = new
GobblinHelixMessagingService(this.helixManager);
+ this.helixInstanceName = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+ GobblinClusterManager.class.getSimpleName());
+
+ log.info("Starting Helix cluster");
+ connectHelixManager();
+ createHelixCluster();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (this.isApplicationRunningFlag.get()) {
+ this.sendShutdownRequest();
+ }
+ this.disconnectHelixManager();
+ }
+
+ void createHelixCluster() {
+ if (this.isHelixClusterManaged) {
+ log.info("Helix cluster is managed; skipping creation of Helix cluster");
+ } else {
+ String clusterName =
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ boolean overwriteExistingCluster = ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_OVERWRITE_KEY,
+ GobblinClusterConfigurationKeys.DEFAULT_HELIX_CLUSTER_OVERWRITE);
+ log.info("Creating Helix cluster {} with overwrite: {}", clusterName,
overwriteExistingCluster);
+
HelixUtils.createGobblinHelixCluster(this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY),
+ clusterName, overwriteExistingCluster);
+ log.info("Created Helix cluster " + clusterName);
+ }
+ }
+
+ void connectHelixManager() {
Review Comment:
why so much package `protected`? I'd expect either `protected` or `private`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -898,49 +858,31 @@ private Path getHdfsLogDir(Path appWorkDir) throws
IOException {
return logRootDir;
}
- private AbstractYarnAppSecurityManager buildSecurityManager() throws
IOException {
+ /**
+ *
+ * @return
+ * @throws IOException
+ */
+ private AbstractAppSecurityManager buildSecurityManager() throws IOException
{
Path tokenFilePath = new Path(this.fs.getHomeDirectory(),
this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+ String securityManagerClassName = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS,
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);
- ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new
ClassAliasResolver<>(
- AbstractYarnAppSecurityManager.class);
try {
- return (AbstractYarnAppSecurityManager)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
- ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS,
GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config,
this.helixManager, this.fs,
- tokenFilePath);
+ if (helixClusterLifecycleManager.isPresent()) {
+ HelixManager helixManager =
helixClusterLifecycleManager.get().getHelixManager();
+ ClassAliasResolver<AbstractYarnAppSecurityManager> aliasResolver = new
ClassAliasResolver<>(AbstractYarnAppSecurityManager.class);
+ return (AbstractYarnAppSecurityManager)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(securityManagerClassName)),
this.config, helixManager, this.fs,
+ tokenFilePath);
+ }
+
+ ClassAliasResolver aliasResolver = new
ClassAliasResolver<>(AbstractAppSecurityManager.class);
+ return (AbstractAppSecurityManager)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(securityManagerClassName)),
this.config, this.fs, tokenFilePath);
Review Comment:
somewhat confusing to have alt. `ClassAliasResolver`s, one typed and the
other `CAR<?>`.
couldn't we just do one and take advantage of the ability to pass `null` to
the second param of the ctor by doing:
```
helixClusterLifecycleManager.map(HCLM::getHelixManager).orElse(null)
```
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
Review Comment:
whitespace
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
Review Comment:
NBD, but I'd combine - `"Starting {} with login task interval of {} mins"`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
+ }
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+ if (this.scheduledTokenRenewTask.isPresent()) {
+ this.scheduledTokenRenewTask.get().cancel(true);
+ }
+ ExecutorsUtils.shutdownExecutorService(this.loginExecutor,
Optional.of(LOGGER));
+ ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor,
Optional.of(LOGGER));
+ }
+
+ protected void scheduleTokenRenewTask() {
+ LOGGER.info(String.format("Scheduling the token renew task with an
interval of %d minute(s)",
+ this.tokenRenewIntervalInMinutes));
+
+ this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+ this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ renewDelegationToken();
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to renew delegation token", ioe);
+ throw Throwables.propagate(ioe);
+ } catch (InterruptedException ie) {
+ LOGGER.error("Token renew task has been interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes,
TimeUnit.MINUTES));
+ }
+
+ //The whole logic for each re-login
+ public void loginAndScheduleTokenRenewal() {
+ try {
+ // Cancel the currently scheduled token renew task
+ if (scheduledTokenRenewTask.isPresent() &&
scheduledTokenRenewTask.get().cancel(true)) {
+ LOGGER.info("Cancelled the token renew task");
+ }
+
+ login();
+ if (firstLogin) {
+ firstLogin = false;
+ }
+
+ // Re-schedule the token renew task after re-login
+ scheduleTokenRenewTask();
+ } catch (IOException | InterruptedException ioe) {
+ LOGGER.error("Failed to login from keytab", ioe);
+ throw Throwables.propagate(ioe);
Review Comment:
let's directly wrap w/ `RuntimeException`; see -
https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
Review Comment:
understandable, since you didn't have time to fully refine, but javadoc
please before checkin
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/helix/HelixClusterLifecycleManager.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.helix;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class HelixClusterLifecycleManager implements Closeable {
+ private final Config config;
+ private final String helixInstanceName;
+
+ @Getter
+ private final AtomicBoolean isApplicationRunningFlag;
Review Comment:
do you really want to provide callers the `AtomicBoolean` so they can adjust
it, or instead just given them:
a.) a read-only `boolean` of the current value?
OR
b.) a `setIsNowRunning()` method
?
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
+ }
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+ if (this.scheduledTokenRenewTask.isPresent()) {
+ this.scheduledTokenRenewTask.get().cancel(true);
+ }
+ ExecutorsUtils.shutdownExecutorService(this.loginExecutor,
Optional.of(LOGGER));
+ ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor,
Optional.of(LOGGER));
+ }
+
+ protected void scheduleTokenRenewTask() {
+ LOGGER.info(String.format("Scheduling the token renew task with an
interval of %d minute(s)",
+ this.tokenRenewIntervalInMinutes));
+
+ this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+ this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ renewDelegationToken();
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to renew delegation token", ioe);
+ throw Throwables.propagate(ioe);
+ } catch (InterruptedException ie) {
+ LOGGER.error("Token renew task has been interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes,
TimeUnit.MINUTES));
+ }
+
+ //The whole logic for each re-login
+ public void loginAndScheduleTokenRenewal() {
+ try {
+ // Cancel the currently scheduled token renew task
+ if (scheduledTokenRenewTask.isPresent() &&
scheduledTokenRenewTask.get().cancel(true)) {
Review Comment:
method should be `synchronized` along w/ `scheduleTokenRenewalTask`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractAppSecurityManager.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+
+/**
+ * <p>
+ * The super class for key management
+ * This class uses a scheduled task to do re-login to re-fetch token on a
+ * configurable schedule. It also uses a second scheduled task
+ * to renew the delegation token after each login. Both the re-login
interval and the token
+ * renewing interval are configurable.
+ * </p>
+ * @author Zihan Li
+ */
+public abstract class AbstractAppSecurityManager extends AbstractIdleService {
+
+ protected Logger LOGGER = LoggerFactory.getLogger(this.getClass().getName());
+
+ protected Config config;
+
+ protected final FileSystem fs;
+ protected final Path tokenFilePath;
+
+ protected Credentials credentials = new Credentials();
+ private final long loginIntervalInMinutes;
+ private final long tokenRenewIntervalInMinutes;
+ private final ScheduledExecutorService loginExecutor;
+ private final ScheduledExecutorService tokenRenewExecutor;
+
+ private Optional<ScheduledFuture<?>> scheduledTokenRenewTask =
Optional.absent();
+
+ // This flag is used to tell if this is the first login. If yes, no token
updated message will be
+ // sent to the controller and the participants as they may not be up running
yet. The first login
+ // happens after this class starts up so the token gets regularly refreshed
before the next login.
+ protected volatile boolean firstLogin = true;
+
+ public AbstractAppSecurityManager(Config config, FileSystem fs, Path
tokenFilePath) {
+ this.config = config;
+ this.fs = fs;
+ this.tokenFilePath = tokenFilePath;
+ this.fs.makeQualified(tokenFilePath);
+ this.loginIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.LOGIN_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_LOGIN_INTERVAL_IN_MINUTES);
+ this.tokenRenewIntervalInMinutes = ConfigUtils.getLong(config,
GobblinYarnConfigurationKeys.TOKEN_RENEW_INTERVAL_IN_MINUTES,
+ GobblinYarnConfigurationKeys.DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES);
+
+ this.loginExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("KeytabReLoginExecutor")));
+ this.tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("TokenRenewExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOGGER.info("Starting the " + this.getClass().getSimpleName());
+
+ LOGGER.info(
+ String.format("Scheduling the login task with an interval of %d
minute(s)", this.loginIntervalInMinutes));
+
+ // Schedule the Kerberos re-login task
+ this.loginExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loginAndScheduleTokenRenewal();
+ }catch(Exception e){
+ LOGGER.error("Error during login, will continue the thread and try
next time.");
+ }
+ }
+ }, this.loginIntervalInMinutes, this.loginIntervalInMinutes,
TimeUnit.MINUTES);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ LOGGER.info("Stopping the " + this.getClass().getSimpleName());
+
+ if (this.scheduledTokenRenewTask.isPresent()) {
+ this.scheduledTokenRenewTask.get().cancel(true);
+ }
+ ExecutorsUtils.shutdownExecutorService(this.loginExecutor,
Optional.of(LOGGER));
+ ExecutorsUtils.shutdownExecutorService(this.tokenRenewExecutor,
Optional.of(LOGGER));
+ }
+
+ protected void scheduleTokenRenewTask() {
+ LOGGER.info(String.format("Scheduling the token renew task with an
interval of %d minute(s)",
+ this.tokenRenewIntervalInMinutes));
+
+ this.scheduledTokenRenewTask = Optional.<ScheduledFuture<?>>of(
+ this.tokenRenewExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ renewDelegationToken();
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to renew delegation token", ioe);
+ throw Throwables.propagate(ioe);
+ } catch (InterruptedException ie) {
+ LOGGER.error("Token renew task has been interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }, this.tokenRenewIntervalInMinutes, this.tokenRenewIntervalInMinutes,
TimeUnit.MINUTES));
+ }
+
+ //The whole logic for each re-login
+ public void loginAndScheduleTokenRenewal() {
+ try {
+ // Cancel the currently scheduled token renew task
+ if (scheduledTokenRenewTask.isPresent() &&
scheduledTokenRenewTask.get().cancel(true)) {
+ LOGGER.info("Cancelled the token renew task");
+ }
+
+ login();
+ if (firstLogin) {
+ firstLogin = false;
+ }
+
+ // Re-schedule the token renew task after re-login
+ scheduleTokenRenewTask();
+ } catch (IOException | InterruptedException ioe) {
+ LOGGER.error("Failed to login from keytab", ioe);
+ throw Throwables.propagate(ioe);
+ }
+ }
+
+ /**
+ * Write the current credentials to the token file.
+ */
+ protected synchronized void writeDelegationTokenToFile(Credentials cred)
throws IOException {
+
+ if (this.fs.exists(this.tokenFilePath)) {
+ LOGGER.info("Deleting existing token file " + this.tokenFilePath);
+ this.fs.delete(this.tokenFilePath, false);
+ }
+
+ LOGGER.debug("creating new token file {} with 644 permission.",
this.tokenFilePath);
Review Comment:
looks more like `setPermission` to `0600`
Issue Time Tracking
-------------------
Worklog Id: (was: 908975)
Time Spent: 0.5h (was: 20m)
> Make a Gobblin yarn app launcher that does not depend on Helix
> --------------------------------------------------------------
>
> Key: GOBBLIN-2004
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2004
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)