[ https://issues.apache.org/jira/browse/GOBBLIN-1031?focusedWorklogId=383308&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383308 ]
ASF GitHub Bot logged work on GOBBLIN-1031: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Feb/20 01:21 Start Date: 07/Feb/20 01:21 Worklog Time Spent: 10m Work Description: sv2000 commented on pull request #2873: [GOBBLIN-1031]Gobblin-on-Yarn locally running Azkaban job skeleton URL: https://github.com/apache/incubator-gobblin/pull/2873#discussion_r376166058 ########## File path: gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.azkaban; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.util.Map; + +import org.apache.gobblin.testing.AssertWithBackoff; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.testng.collections.Lists; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closer; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Given a set up Azkaban job configuration, launch the Gobblin-on-Yarn job in a semi-embedded mode: + * - Uses external Kafka cluster and requires external Zookeeper(Non-embedded TestingServer) to be set up. + * The Kafka Cluster was intentionally set to be external due to the data availability. External ZK was unintentional + * as the helix version (0.9) being used cannot finish state transition in the Embedded ZK. + * TODO: Adding embedded Kafka cluster and set golden datasets for data-validation. + * - Uses MiniYARNCluster so YARN components don't have to be installed. + */ +@Slf4j +public class EmbeddedGobblinYarnAppLauncher extends AzkabanJobRunner { + public static final String DYNAMIC_CONF_PATH = "dynamic.conf"; + public static final String YARN_SITE_XML_PATH = "yarn-site.xml"; + private static String zkString = ""; + private static String fileAddress = ""; + + private static void setup(String[] args) + throws Exception { + // Parsing zk-string + Preconditions.checkArgument(args.length == 1); + zkString = args[0]; + + // Initialize necessary external components: Yarn and Helix + Closer closer = Closer.create(); + + // Set java home in environment since it isn't set on some systems + String javaHome = System.getProperty("java.home"); + setEnv("JAVA_HOME", javaHome); + + final YarnConfiguration clusterConf = new YarnConfiguration(); + clusterConf.set("yarn.resourcemanager.connect.max-wait.ms", "10000"); + clusterConf.set("yarn.nodemanager.resource.memory-mb", "512"); + clusterConf.set("yarn.scheduler.maximum-allocation-mb", "1024"); + + MiniYARNCluster miniYARNCluster = closer.register(new MiniYARNCluster("TestCluster", 1, 1, 1)); + miniYARNCluster.init(clusterConf); + miniYARNCluster.start(); + + // YARN client should not be started before the Resource Manager is up + AssertWithBackoff.create().logger(log).timeoutMs(10000).assertTrue(new Predicate<Void>() { + @Override + public boolean apply(Void input) { + return !clusterConf.get(YarnConfiguration.RM_ADDRESS).contains(":0"); + } + }, "Waiting for RM"); + + try (PrintWriter pw = new PrintWriter(DYNAMIC_CONF_PATH, "UTF-8")) { + File dir = new File("target/dummydir"); + + // dummy directory specified in configuration + if (!dir.mkdir()) { + log.error("The dummy folder's creation is not successful"); + } + dir.deleteOnExit(); + + pw.println("gobblin.cluster.zk.connection.string=\"" + zkString + "\""); + pw.println("jobconf.fullyQualifiedPath=\"" + dir.getAbsolutePath() + "\""); + } + + // YARN config is dynamic and needs to be passed to other processes + try (OutputStream os = new FileOutputStream(new File(YARN_SITE_XML_PATH))) { + clusterConf.writeXml(os); + } + + /** Have to pass the same yarn-site.xml to the GobblinYarnAppLauncher to initialize Yarn Client. */ + fileAddress = new File(YARN_SITE_XML_PATH).getAbsolutePath(); + } + + public static void setEnv(String key, String value) { Review comment: Does this need to be public? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 383308) Time Spent: 2h (was: 1h 50m) > Making Gobblin-Streaming runnable with Azkaban configuration locally > -------------------------------------------------------------------- > > Key: GOBBLIN-1031 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1031 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Lei Sun > Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)