Repository: incubator-gobblin Updated Branches: refs/heads/master 3bc3d3691 -> 6232b416a
[GOBBLIN-533] upgrade helix to 0.8.1 Closes #2396 from arjun4084346/helixUpgrade Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6232b416 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6232b416 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6232b416 Branch: refs/heads/master Commit: 6232b416a59613db32f6c08cde703c71555d37b8 Parents: 3bc3d36 Author: Arjun <[email protected]> Authored: Fri Jul 13 11:33:11 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Jul 13 11:33:11 2018 -0700 ---------------------------------------------------------------------- .../gobblin/aws/CloudInitScriptBuilder.java | 7 +- .../gobblin/aws/CloudInitScriptBuilderTest.java | 31 +++++++- ...blinHelixDistributeJobExecutionLauncher.java | 2 +- .../cluster/GobblinHelixJobLauncher.java | 2 +- .../gobblin/cluster/GobblinHelixTaskDriver.java | 80 -------------------- .../org/apache/gobblin/cluster/HelixUtils.java | 25 +----- gradle/scripts/dependencyDefinitions.gradle | 2 +- 7 files changed, 35 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java index fa65f5b..8da15aa 100644 --- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java +++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java @@ -53,6 +53,7 @@ public class CloudInitScriptBuilder { private static final String NFS_SERVER_START_CMD = "/etc/init.d/nfs start"; private static final String NFS_EXPORT_FS_CMD = "exportfs -a"; private static final String NFS_TYPE_4 = "nfs4"; + public static final String BASH = "#!/bin/bash"; /*** * This method generates the script that would be executed by cloud-init module in EC2 instance @@ -87,7 +88,7 @@ public class CloudInitScriptBuilder { String masterS3ConfUri, String masterS3ConfFiles, String masterS3JarsUri, String masterS3JarsFiles, String masterJarsDir, String masterJvmMemory, Optional<String> masterJvmArgs, Optional<String> gobblinVersion) { - final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n"); + final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n"); final String clusterMasterClassName = GobblinAWSClusterManager.class.getSimpleName(); @@ -196,7 +197,7 @@ public class CloudInitScriptBuilder { String workerS3ConfUri, String workerS3ConfFiles, String workerS3JarsUri, String workerS3JarsFiles, String workerJarsDir, String workerJvmMemory, Optional<String> workerJvmArgs, Optional<String> gobblinVersion) { - final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n"); + final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n"); final String clusterWorkerClassName = GobblinAWSTaskRunner.class.getSimpleName(); @@ -262,7 +263,7 @@ public class CloudInitScriptBuilder { .append(clusterWorkerClassName).append(".") .append("$pi").append(".") .append(CloudInitScriptBuilder.STDERR); - cloudInitCmds.append(launchGobblinClusterWorkerCmd); + cloudInitCmds.append(launchGobblinClusterWorkerCmd).append("\n"); final String cloudInitScript = cloudInitCmds.toString(); LOGGER.info("Cloud-init script for worker node: " + cloudInitScript); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java index a601991..cb1c70f 100644 --- a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java +++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java @@ -17,11 +17,16 @@ package org.apache.gobblin.aws; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; import org.junit.BeforeClass; import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.util.Strings; import com.google.common.base.Optional; @@ -64,10 +69,8 @@ public class CloudInitScriptBuilderTest { @BeforeClass public void setup() throws Exception { - this.expectedMasterCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader() - .getResourceAsStream(MASTER_CLOUD_INIT_SCRIPT), "UTF-8"); - this.expectedWorkerCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader() - .getResourceAsStream(WORKER_CLOUD_INIT_SCRIPT), "UTF-8"); + this.expectedMasterCloudInitScript = loadFile(MASTER_CLOUD_INIT_SCRIPT); + this.expectedWorkerCloudInitScript = loadFile(WORKER_CLOUD_INIT_SCRIPT); } @Test @@ -93,4 +96,24 @@ public class CloudInitScriptBuilderTest { Assert.assertEquals(decodedScript, this.expectedWorkerCloudInitScript, "Worker launcher cloud-init script not built as expected"); } + + /** + * loads the given file into a string, ignoring the comments, but considering "#!/bin/bash" + * @param file file to read + * @return file content as a string + * @throws IOException + */ + private String loadFile(String file) throws IOException { + StringBuilder sb = new StringBuilder(); + + List<String> lines = IOUtils + .readLines(new InputStreamReader(GobblinAWSClusterLauncherTest.class.getClassLoader().getResourceAsStream(file), "UTF-8")); + + for (String line : lines) { + if (line.equals(CloudInitScriptBuilder.BASH) || (!line.startsWith("#") && !Strings.isNullOrEmpty(line))) { + sb.append(line).append("\n"); + } + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 5800e8d..87613a6 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -231,7 +231,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); return getResultFromUserContent(); } catch (TimeoutException te) { - HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, planningName, 10L); + helixTaskDriver.waitToStop(planningName, 10L); this.helixTaskDriver.delete(planningName); this.helixTaskDriver.resume(planningName); log.info("stopped the queue, deleted the job"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 231575e..39c6e5b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -368,7 +368,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.jobContext.getJobId(), timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); } catch (TimeoutException te) { - HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, helixQueueName, 10L); + helixTaskDriver.waitToStop(helixQueueName, 10L); try { cancelJob(this.jobListener); } catch (JobException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java deleted file mode 100644 index ebe2b52..0000000 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.cluster; - -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.HelixPropertyStore; -import org.apache.helix.task.TargetState; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.WorkflowConfig; -import org.apache.helix.task.WorkflowContext; - -/** - * #HELIX-0.6.7-WORKAROUND - * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade - */ -public class GobblinHelixTaskDriver { - private final TaskDriver _taskDriver; - - public GobblinHelixTaskDriver(HelixManager manager) { - this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager - .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName()); - } - - public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor, - HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { - _taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName); - } - - /** - * Delete the workflow - * - * @param workflow The workflow name - * @param timeout The timeout for deleting the workflow/queue in seconds - */ - public void deleteWorkflow(String workflow, long timeout) throws InterruptedException { - WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow); - - // set the target state if not already set - if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) { - _taskDriver.delete(workflow); - } - - long endTime = System.currentTimeMillis() + (timeout * 1000); - - // check for completion of deletion request - while (System.currentTimeMillis() <= endTime) { - WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow); - - if (workflowContext != null) { - Thread.sleep(1000); - } else { - // Successfully deleted - return; - } - } - - // Failed to complete deletion within timeout - throw new HelixException(String - .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 0c5dbec..ea4e6f7 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -101,8 +101,7 @@ public class HelixUtils { // If the queue is present, but in delete state then wait for cleanup before recreating the queue if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { - GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(helixManager); - gobblinHelixTaskDriver.deleteWorkflow(queueName, jobQueueDeleteTimeoutSeconds); + new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds); // if we get here then the workflow was successfully deleted workflowConfig = null; } @@ -147,26 +146,4 @@ public class HelixUtils { throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired."); } - - /** - * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4 - * is not available in currently used version 0.6.9 - */ - public static void helixTaskDriverWaitToStop( - HelixManager helixManager, - TaskDriver helixTaskDriver, - String queueName, - long timeoutInSeconds) throws InterruptedException { - helixTaskDriver.stop(queueName); - long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; - while (System.currentTimeMillis() <= endTime) { - WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName); - if (workflowContext == null || workflowContext.getWorkflowState() - .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) { - Thread.sleep(1000); - } else { - log.info("Successfully stopped the queue"); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gradle/scripts/dependencyDefinitions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index ad1c1cf..c1f53db 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -61,7 +61,7 @@ ext.externalDependency = [ "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion, "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion, "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0", - "helix": "org.apache.helix:helix-core:0.6.9", + "helix": "org.apache.helix:helix-core:0.8.1", "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion, "hiveService": "org.apache.hive:hive-service:" + hiveVersion, "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,
