This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 94c40dc95 [GOBBLIN-1743] Ensure GobblinTaskRunner works without Yarn
use (#3602)
94c40dc95 is described below
commit 94c40dc950e57b32cc5c14dfaddddb4538df2709
Author: umustafi <[email protected]>
AuthorDate: Mon Nov 21 11:31:43 2022 -0800
[GOBBLIN-1743] Ensure GobblinTaskRunner works without Yarn use (#3602)
* [GOBBLIN-1743] Ensure GobblinTaskRunner works without Yarn use
* fix log message
* codestyle fix
* move log into right code block
* fix white space changes
* fix another white space
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../apache/gobblin/cluster/GobblinTaskRunner.java | 36 +++++++++++++---------
1 file changed, 21 insertions(+), 15 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 610682aa8..ad6ab7157 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -544,23 +544,28 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
private void addInstanceTags() {
HelixManager receiverManager = getReceiverManager();
if (receiverManager.isConnected()) {
- // The helix instance associated with this container should be
consistent on helix tag
- List<String> existedTags = receiverManager.getClusterManagmentTool()
- .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags();
- Set<String> desiredTags = new HashSet<>(
- ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
- if (!desiredTags.isEmpty()) {
- // Remove tag assignments for the current Helix instance from a
previous run
- for (String tag : existedTags) {
- if (!desiredTags.contains(tag))
-
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName,
this.helixInstanceName, tag);
- logger.info("Removed unrelated helix tag {} for instance {}", tag,
this.helixInstanceName);
+ try {
+ Set<String> desiredTags = new HashSet<>(
+ ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
+ if (!desiredTags.isEmpty()) {
+ // The helix instance associated with this container should be
consistent on helix tag
+ List<String> existedTags = receiverManager.getClusterManagmentTool()
+ .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags();
+ // Remove tag assignments for the current Helix instance from a
previous run
+ for (String tag : existedTags) {
+ if (!desiredTags.contains(tag)) {
+
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName,
this.helixInstanceName, tag);
+ logger.info("Removed unrelated helix tag {} for instance {}",
tag, this.helixInstanceName);
+ }
+ }
+ desiredTags.forEach(desiredTag ->
receiverManager.getClusterManagmentTool()
+ .addInstanceTag(this.clusterName, this.helixInstanceName,
desiredTag));
+ logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
+ .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
}
- desiredTags.forEach(desiredTag ->
receiverManager.getClusterManagmentTool()
- .addInstanceTag(this.clusterName, this.helixInstanceName,
desiredTag));
+ } catch (HelixException e) {
+ logger.warn("Error with Helix getting instance config tags used in
YARN cluster configuration. Ensure YARN is being used. Will ignore and attempt
to move on {}", e);
}
- logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
- .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
}
}
@@ -599,6 +604,7 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
private Optional<ContainerMetrics> buildContainerMetrics() {
Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
if (GobblinMetrics.isEnabled(properties)) {
+ logger.info("Container metrics are enabled");
return Optional.of(ContainerMetrics
.get(ConfigUtils.configToState(clusterConfig), this.applicationName,
this.taskRunnerId));
} else {