This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new c02c293 [GOBBLIN-1031] Gobblin-on-Yarn locally running Azkaban job skeleton c02c293 is described below commit c02c29327eae10d1f79efbf5564053d58a280b93 Author: autumnust <le...@linkedin.com> AuthorDate: Sat Feb 22 11:58:13 2020 -0800 [GOBBLIN-1031] Gobblin-on-Yarn locally running Azkaban job skeleton Closes #2873 from autumnust/GobblinAppMasterTest --- .../gobblin/cluster/GobblinClusterManager.java | 4 +- .../gobblin/cluster/GobblinHelixJobLauncher.java | 14 ++- gobblin-modules/gobblin-azkaban/build.gradle | 3 + .../AzkabanGobblinLocalYarnAppLauncher.java | 49 ++++++++ .../azkaban/AzkabanGobblinYarnAppLauncher.java | 25 +++- .../apache/gobblin/azkaban/AzkabanJobRunner.java | 117 ++++++++++++++++++ .../azkaban/EmbeddedGobblinYarnAppLauncher.java | 135 +++++++++++++++++++++ .../src/main/resources/conf/gobblin_conf/app.btm | 32 +++++ .../conf/gobblin_conf/log4j-yarn.properties | 24 ++++ .../gobblin_jobs/kafka-hdfs-streaming-avro.conf | 88 ++++++++++++++ .../conf/jobs/kafka-streaming-on-yarn.job | 53 ++++++++ .../resources/conf/properties/common.properties | 63 ++++++++++ .../resources/conf/properties/local.properties | 21 ++++ .../extractor/extract/kafka/KafkaSource.java | 2 +- .../gobblin/runtime/AbstractJobLauncher.java | 2 +- .../org/apache/gobblin/util/logs/LogCopier.java | 5 +- .../yarn/AbstractYarnAppSecurityManager.java | 2 +- .../gobblin/yarn/GobblinYarnAppLauncher.java | 14 ++- .../gobblin/yarn/GobblinYarnConfigurationKeys.java | 1 + .../apache/gobblin/yarn/GobblinYarnLogSource.java | 6 +- .../gobblin/yarn/YarnAutoScalingManager.java | 2 + .../gobblin/yarn/GobblinYarnAppLauncherTest.java | 6 +- 22 files changed, 642 insertions(+), 26 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 5900f64..9574a97 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -234,7 +234,9 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri } /** - * Configure Helix quota-based task scheduling + * Configure Helix quota-based task scheduling. + * This config controls the number of tasks that are concurrently assigned to a single Helix instance. + * Reference: https://helix.apache.org/0.9.1-docs/quota_scheduling.html */ @VisibleForTesting void configureHelixQuotaBasedTaskScheduling() { diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index cb2b434..418ce62 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -93,8 +93,8 @@ import org.apache.gobblin.util.SerializationUtils; * </p> * * <p> - * This class runs in the {@link GobblinClusterManager}. The actual task execution happens in the in the - * {@link GobblinTaskRunner}. + * This class is instantiated by the {@link GobblinHelixJobScheduler} on every job submission to launch the Gobblin job. + * The actual task execution happens in the {@link GobblinTaskRunner}, usually in a different process. * </p> * * @author Yinan Li @@ -164,7 +164,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps) .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(), - appWorkDir.toUri().getPort(), null, null, null).toString())); + appWorkDir.toUri().getPort(), "/", null, null).toString())); this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir, @@ -371,6 +371,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.jobListener = jobListener; boolean isLaunched = false; this.runningMap.putIfAbsent(this.jobContext.getJobName(), false); + + Throwable errorInJobLaunching = null; try { if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) { LOGGER.info ("Job {} will be executed, add into running map.", this.jobContext.getJobId()); @@ -379,12 +381,16 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } else { LOGGER.warn ("Job {} will not be executed because other jobs are still running.", this.jobContext.getJobId()); } + // TODO: Better error handling + } catch (Throwable t) { + errorInJobLaunching = t; } finally { if (isLaunched) { if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) { LOGGER.info ("Job {} is done, remove from running map.", this.jobContext.getJobId()); } else { - throw new IllegalStateException("A launched job should have running state equal to true in the running map."); + throw errorInJobLaunching == null ? new IllegalStateException("A launched job should have running state equal to true in the running map.") + : new RuntimeException("Failure in launching job:", errorInJobLaunching); } } } diff --git a/gobblin-modules/gobblin-azkaban/build.gradle b/gobblin-modules/gobblin-azkaban/build.gradle index 2f1dde6..c529ace 100644 --- a/gobblin-modules/gobblin-azkaban/build.gradle +++ b/gobblin-modules/gobblin-azkaban/build.gradle @@ -28,6 +28,9 @@ dependencies { compile project(":gobblin-metrics-libs:gobblin-metrics") compile project(":gobblin-utility") compile project(":gobblin-yarn") + compile externalDependency.curatorFramework + compile externalDependency.hadoopYarnMiniCluster + compile externalDependency.curatorTest compile externalDependency.azkaban compile externalDependency.log4j diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java new file mode 100644 index 0000000..53ccafa --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.azkaban; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + +/** + * An extension of {@link AzkabanGobblinYarnAppLauncher} for locally-running Azkaban instances since it provides + * capability of changing yarn-resource related configuration in the way that could work with lighter hardware. + */ +public class AzkabanGobblinLocalYarnAppLauncher extends AzkabanGobblinYarnAppLauncher { + public AzkabanGobblinLocalYarnAppLauncher(String jobId, Properties gobblinProps) + throws IOException { + super(jobId, gobblinProps); + } + + @Override + protected YarnConfiguration initYarnConf(Properties gobblinProps) { + YarnConfiguration yarnConfiguration = super.initYarnConf(gobblinProps); + if (gobblinProps.containsKey("yarn-site-address")) { + yarnConfiguration.addResource(new Path(gobblinProps.getProperty("yarn-site-address"))); + } else { + yarnConfiguration.set("yarn.resourcemanager.connect.max-wait.ms", "10000"); + yarnConfiguration.set("yarn.nodemanager.resource.memory-mb", "512"); + yarnConfiguration.set("yarn.scheduler.maximum-allocation-mb", "1024"); + } + return yarnConfiguration; + } +} diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java index 6fa7eab..4747e89 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java @@ -23,6 +23,9 @@ import java.util.Properties; import java.util.concurrent.TimeoutException; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.yarn.GobblinYarnAppLauncher; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Logger; @@ -31,10 +34,8 @@ import com.google.common.base.Charsets; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.yarn.GobblinYarnAppLauncher; - import azkaban.jobExecutor.AbstractJob; +import lombok.Getter; /** @@ -60,13 +61,25 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob { private final GobblinYarnAppLauncher gobblinYarnAppLauncher; - public AzkabanGobblinYarnAppLauncher(String jobId, Properties props) throws IOException { + @Getter + private final YarnConfiguration yarnConfiguration; + + public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) throws IOException { super(jobId, LOGGER); - Config gobblinConfig = ConfigUtils.propertiesToConfig(props); + Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps); outputConfigToFile(gobblinConfig); - this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, new YarnConfiguration()); + yarnConfiguration = initYarnConf(gobblinProps); + + this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, this.yarnConfiguration); + } + + /** + * Extended class can override this method by providing their own YARN configuration. + */ + protected YarnConfiguration initYarnConf(Properties gobblinProps) { + return new YarnConfiguration(); } @Override diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java new file mode 100644 index 0000000..9ec3e0e --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.azkaban; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.io.Files; + +import azkaban.jobExecutor.AbstractJob; +import azkaban.utils.Props; +import azkaban.utils.PropsUtils; +import lombok.RequiredArgsConstructor; + + +/** + * Runs Azkaban jobs locally. + * + * Usage: + * Extend the class, in the constructor pass the list of relative paths to all common properties files, as well as list + * of job files to run. + * + * Execution: + * java -cp ... {@link AzkabanJobRunner} class-name root-directory + * + * Where class-name is the extension of {@link AzkabanJobRunner} that should be executed, and root-directory is the + * root directory of the repository. + * + * @author Issac Buenrostro + */ +@RequiredArgsConstructor +public class AzkabanJobRunner { + private File baseDirectory = new File("."); + private final List<String> commonProps; + private final List<String> jobProps; + private final Map<String, String> overrides; + + static void doMain(Class<? extends AzkabanJobRunner> klazz, String[] args) + throws Exception { + AzkabanJobRunner runner = klazz.newInstance(); + if (args.length >= 1) { + runner.setBaseDirectory(new File(args[0])); + } + runner.run(); + } + + public static String getTempDirectory() { + File tmpDirectory = Files.createTempDir(); + tmpDirectory.deleteOnExit(); + return tmpDirectory.getAbsolutePath(); + } + + private void setBaseDirectory(File baseDirectory) { + this.baseDirectory = baseDirectory; + } + + public void run() + throws IOException { + + Props commonProps = new Props(); + for (String commonPropsFile : this.commonProps) { + commonProps = new Props(commonProps, new File(baseDirectory, commonPropsFile)); + } + + for (String jobFile : this.jobProps) { + File file = new File(baseDirectory, jobFile); + Props jobProps = new Props(new Props(commonProps, file), this.overrides); + jobProps = PropsUtils.resolveProps(jobProps); + try { + AbstractJob job = constructAbstractJob(file.getName(), jobProps); + job.run(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + } + + private AbstractJob constructAbstractJob(String name, Props jobProps) { + try { + return (AbstractJob) jobProps.getClass("job.class").getConstructor(String.class, Props.class) + .newInstance(name, jobProps); + } catch (ReflectiveOperationException roe) { + try { + return (AbstractJob) jobProps.getClass("job.class").getConstructor(String.class, Properties.class) + .newInstance(name, propsToProperties(jobProps)); + } catch (ReflectiveOperationException exc) { + throw new RuntimeException(exc); + } + } + } + + private Properties propsToProperties(Props props) { + Properties properties = new Properties(); + for (String key : props.getKeySet()) { + properties.put(key, props.getString(key)); + } + return properties; + } +} diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java new file mode 100644 index 0000000..0996342 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.azkaban; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.util.Map; + +import org.apache.gobblin.testing.AssertWithBackoff; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.testng.collections.Lists; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closer; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Given a set up Azkaban job configuration, launch the Gobblin-on-Yarn job in a semi-embedded mode: + * - Uses external Kafka cluster and requires external Zookeeper(Non-embedded TestingServer) to be set up. + * The Kafka Cluster was intentionally set to be external due to the data availability. External ZK was unintentional + * as the helix version (0.9) being used cannot finish state transition in the Embedded ZK. + * TODO: Adding embedded Kafka cluster and set golden datasets for data-validation. + * - Uses MiniYARNCluster so YARN components don't have to be installed. + */ +@Slf4j +public class EmbeddedGobblinYarnAppLauncher extends AzkabanJobRunner { + public static final String DYNAMIC_CONF_PATH = "dynamic.conf"; + public static final String YARN_SITE_XML_PATH = "yarn-site.xml"; + private static String zkString = ""; + private static String fileAddress = ""; + + private static void setup(String[] args) + throws Exception { + // Parsing zk-string + Preconditions.checkArgument(args.length == 1); + zkString = args[0]; + + // Initialize necessary external components: Yarn and Helix + Closer closer = Closer.create(); + + // Set java home in environment since it isn't set on some systems + String javaHome = System.getProperty("java.home"); + setEnv("JAVA_HOME", javaHome); + + final YarnConfiguration clusterConf = new YarnConfiguration(); + clusterConf.set("yarn.resourcemanager.connect.max-wait.ms", "10000"); + clusterConf.set("yarn.nodemanager.resource.memory-mb", "512"); + clusterConf.set("yarn.scheduler.maximum-allocation-mb", "1024"); + + MiniYARNCluster miniYARNCluster = closer.register(new MiniYARNCluster("TestCluster", 1, 1, 1)); + miniYARNCluster.init(clusterConf); + miniYARNCluster.start(); + + // YARN client should not be started before the Resource Manager is up + AssertWithBackoff.create().logger(log).timeoutMs(10000).assertTrue(new Predicate<Void>() { + @Override + public boolean apply(Void input) { + return !clusterConf.get(YarnConfiguration.RM_ADDRESS).contains(":0"); + } + }, "Waiting for RM"); + + try (PrintWriter pw = new PrintWriter(DYNAMIC_CONF_PATH, "UTF-8")) { + File dir = new File("target/dummydir"); + + // dummy directory specified in configuration + if (!dir.mkdir()) { + log.error("The dummy folder's creation is not successful"); + } + dir.deleteOnExit(); + + pw.println("gobblin.cluster.zk.connection.string=\"" + zkString + "\""); + pw.println("jobconf.fullyQualifiedPath=\"" + dir.getAbsolutePath() + "\""); + } + + // YARN config is dynamic and needs to be passed to other processes + try (OutputStream os = new FileOutputStream(new File(YARN_SITE_XML_PATH))) { + clusterConf.writeXml(os); + } + + /** Have to pass the same yarn-site.xml to the GobblinYarnAppLauncher to initialize Yarn Client. */ + fileAddress = new File(YARN_SITE_XML_PATH).getAbsolutePath(); + } + + static void setEnv(String key, String value) { + try { + Map<String, String> env = System.getenv(); + Class<?> cl = env.getClass(); + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Map<String, String> writableEnv = (Map<String, String>) field.get(env); + writableEnv.put(key, value); + } catch (Exception e) { + throw new IllegalStateException("Failed to set environment variable", e); + } + } + + public static void main(String[] args) + throws Exception { + setup(args); + AzkabanJobRunner.doMain(EmbeddedGobblinYarnAppLauncher.class, args); + } + + public EmbeddedGobblinYarnAppLauncher() { + super(Lists.newArrayList("gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties", + "gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties"), + Lists.newArrayList("gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job"), + ImmutableMap.of("yarn.resourcemanager.connect.max-wait.ms", "10000", "gobblin.cluster.zk.connection.string", + EmbeddedGobblinYarnAppLauncher.zkString, "gobblin.cluster.job.conf.path", + "gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs", "gobblin.yarn.conf.dir", + "gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf", "yarn-site-address", fileAddress)); + } +} diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm new file mode 100644 index 0000000..59e6489 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm @@ -0,0 +1,32 @@ +RULE trace main entry +CLASS GobblinYarnTaskRunner +METHOD main +AT ENTRY +IF true +DO traceln("entering main") +ENDRULE + +RULE trace main exit +CLASS GobblinYarnTaskRunner +METHOD main +AT EXIT +IF true +DO traceln("exiting main") +ENDRULE + +RULE create countdown for converter +CLASS LiKafkaConsumerRecordToGenericRecordConverter +METHOD <init> +IF TRUE +DO createCountDown($0, 100000) +ENDRULE + +RULE trace converter entry +CLASS LiKafkaConsumerRecordToGenericRecordConverter +METHOD convertRecordImpl +AT ENTRY +IF countDown($0) +DO THROW new org.apache.gobblin.converter.DataConversionException("Injected exception") +# The following line can be used to kill the JVM +#DO traceln("killing JVM"), killJVM() +ENDRULE diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties new file mode 100755 index 0000000..c5a1b7e --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, loggerId +log4j.appender.loggerId=org.apache.log4j.rolling.RollingFileAppender +log4j.appender.loggerId.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy +log4j.appender.loggerId.rollingPolicy.ActiveFileName=${gobblin.yarn.app.container.log.dir}/${gobblin.yarn.app.container.log.file} +log4j.appender.loggerId.rollingPolicy.FileNamePattern=${gobblin.yarn.app.container.log.dir}/${gobblin.yarn.app.container.log.file}.%d{yyyy-MM-dd-HH} +log4j.appender.loggerId.layout=org.apache.log4j.PatternLayout +log4j.appender.loggerId.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %X{tableName} - %m%n \ No newline at end of file diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf new file mode 100644 index 0000000..71c64dc --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf @@ -0,0 +1,88 @@ +# A sample skeleton that reads from a Kafka topic and writes to Local FS in a streaming manner +# This sample job works with Embedded Gobblin using LocalJobLauncher instead of going through Yarn approach. + +job.name=LocalKafkaStreaming +job.group=streaming +job.description=A getting started example for Gobblin streaming to Kafka +job.lock.enabled=false + +# Flag to enable StreamModelTaskRunner +task.execution.synchronousExecutionModel=false +gobblin.task.is.single.branch.synchronous=true +taskexecutor.threadpool.size=1 +fork.record.queue.capacity=1 + +# Streaming-source specific configurations +source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource +gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor +kafka.workunit.size.estimator.type=CUSTOM +kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator +kafka.workunit.packer.type=CUSTOM +kafka.workunit.packer.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker +extract.namespace=org.apache.gobblin.streaming.test + +# Configure watermark storage for streaming, using FS-based for local testing +streaming.watermarkStateStore.type=fs +streaming.watermark.commitIntervalMillis=2000 + +# Converter configs +# Default Generic Record based pipeline +recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter" + +# Record-metadata decoration into main record +gobblin.kafka.converter.recordMetadata.enable=true + +# Writer configs +writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder +writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner +writer.output.format=AVRO +writer.partition.columns=header.time +writer.partition.pattern=yyyy/MM/dd +writer.destination.type=HDFS +writer.staging.dir=/tmp/gobblin/streaming/writer-staging +writer.output.dir=/tmp/gobblin/streaming/writer-output +writer.closeOnFlush=true + +state.store.enabled=false + +# Publisher config +data.publisher.type=org.apache.gobblin.publisher.NoopPublisher +data.publisher.final.dir=/tmp/gobblin/kafka/publish +flush.data.publisher.class=org.apache.gobblin.prototype.kafka.TimePartitionedStreamingDataPublisher +###Config that controls intervals between flushes (and consequently, data publish) +stream.flush.interval.secs=60 + +### Following are Kafka Upstream related configurations +# Kafka source configurations +topic.whitelist= +bootstrap.with.offset=EARLIEST +source.kafka.fetchTimeoutMillis=3000 +kafka.consumer.maxPollRecords=100 + +#Kafka broker/schema registry configs +kafka.schema.registry.url= +kafka.schema.registry.class= +kafka.schemaRegistry.class= +kafka.schemaRegistry.url= +kafka.brokers= + +#Kafka SSL configs +security.protocol = SSL +ssl.protocol = TLS +ssl.trustmanager.algorithm = +ssl.keymanager.algorithm = +ssl.truststore.type = +ssl.truststore.location = +ssl.truststore.password = +ssl.keystore.type = +ssl.keystore.password = +ssl.key.password = +ssl.secure.random.implementation = +ssl.keystore.location=<path to your kafka certs> + +metrics.enabled=false + +# Only Required for Local-testing +kafka.consumer.runtimeIngestionPropsEnabled=false +# Limit single mappers for ease of debugging +mr.job.max.mappers = 1 \ No newline at end of file diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job new file mode 100644 index 0000000..05a5fad --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job @@ -0,0 +1,53 @@ +# This job starts up a Gobblin on YARN application master +# that runs jobs specified in the gobblin_jobs directory +job.name=kafka-streaming-on-yarn +gobblin.yarn.app.master.memory.mbs=1024 +gobblin.yarn.app.master.cores=1 +gobblin.yarn.app.report.interval.minutes=5 +gobblin.yarn.max.get.app.report.failures=4 +gobblin.yarn.email.notification.on.shutdown=false + +#Set the minimum number of containers to 1 for ease of observing. +gobblin.yarn.initial.containers=1 +gobblin.yarn.autoScaling.minContainers=1 + +gobblin.yarn.container.memory.mbs=1024 +gobblin.yarn.container.jvmMemoryOverheadMbs=600 +gobblin.yarn.container.cores=1 +gobblin.yarn.container.affinity.enabled=true +gobblin.yarn.helix.instance.max.retries=2 + +# Use config set in the Azkaban job for initializing the yarn containers +gobblin.yarn.akabanConfigOutputDir=./gobblin_config +gobblin.yarn.akabanConfigOutputPath=${gobblin.yarn.akabanConfigOutputDir}/application.conf + +gobblin.yarn.conf.dir=./conf +gobblin.yarn.lib.jars.dir=<path to where the gobblin libraries sit> +gobblin.yarn.app.master.files.local=<Path to yarn-site.xml>,${gobblin.yarn.conf.dir}/app.btm,${gobblin.yarn.conf.dir}/log4j-yarn.properties,${gobblin.yarn.akabanConfigOutputDir}/application.conf +gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local} +gobblin.yarn.log.copier.disable.driver.copy=true +gobblin.yarn.app.master.serviceClasses=org.apache.gobblin.yarn.YarnAutoScalingManager + +# Cluster configuration properties +# gobblin.cluster.helix.cluster.name=${gobblin.yarn.app.name} + +# Job Configuration manager properties +gobblin.cluster.job.configuration.manager=org.apache.gobblin.cluster.FsJobConfigurationManager +gobblin.cluster.specConsumer.class=org.apache.gobblin.runtime.api.FsSpecConsumer + +# This config is to restrict assignment to one task per container +gobblin.cluster.helixTaskQuotaConfig=DEFAULT:1,UNUSED:39 + +job.execinfo.server.enabled=false +admin.server.enabled=false + +# File system URIs +# writer.fs.uri=${fs.uri} +# state.store.fs.uri=${fs.uri} + +# state.store.dir=${gobblin.yarn.work.dir}/state-store +# qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err +job.lock.enabled=false +# job.lock.dir=${gobblin.yarn.work.dir}/locks + +writer.include.record.count.in.file.names=true \ No newline at end of file diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties new file mode 100644 index 0000000..bc676ec --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties @@ -0,0 +1,63 @@ +type=hadoopJava +job.class=org.apache.gobblin.azkaban.AzkabanGobblinLocalYarnAppLauncher +job.name=GobblinKafkaStreaming +job.group=GobblinKafkaStreaming +job.lock.enabled=false +encrypt.key.loc=/jobs/kafkaetl/gobblin/master +cleanup.staging.data.per.task=false +user.to.proxy=kafkaetl + +# This assumes all dependent jars are put into the 'lib' directory +job.jars=lib/*,resources/gobblin-site.xml +classpath=lib/*,/export/apps/hadoop/latest/lib/*,${hive.jars} + +# MR Configurations +jvmArgsMem=-XX:MaxPermSize=128M +jvmArgsGc=-XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ScavengeBeforeFullGC -XX:+CMSScavengeBeforeRemark +jvmArgsGcLog=-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:gc.log +jvmArgsPerf=-XX:+UseCompressedOops +jvmArgsError=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/grid/a/mapred/tmp/ +jvm.args=${jvmArgsMem} ${jvmArgsGc} ${jvmArgsGcLog} ${jvmArgsPerf} ${jvmArgsError} + +# Common directories +gobblin.dataset.root.dir=${root.data.location}/tracking/streaming_parallel +# task.data.root.dir=${gobblin.dataset.root.dir}/_working/${root.project.name}/task-data +job.work.dir=/tmp/${root.project.name}/${job.name} +qualitychecker.row.err.file=${home.dir}/gobblin/${root.project.name}/err +job.lock.dir=${job.work.dir}/locks +state.store.dir=${home.dir}/gobblin/${root.project.name}/state-store +mr.job.root.dir=${home.dir}/gobblin/${root.project.name}/working +mr.job.lock.dir=${home.dir}/gobblin/${root.project.name}/locks +writer.staging.dir=${job.work.dir}/task-staging +writer.output.dir=${job.work.dir}/task-output + +# Compaction specific directories +compaction.input.dir=${gobblin.dataset.root.dir} +compaction.dest.dir=${gobblin.dataset.root.dir} +compaction.input.subdir=hourly +compaction.dest.subdir=daily +compaction.tmp.dest.dir=/tmp/${root.project.name}/${job.name} + +# FS URIs +source.filebased.fs.uri=${fs.uri} +writer.fs.uri=${fs.uri} +compaction.file.system.uri=${fs.uri} +state.store.fs.uri=${fs.uri} + +# Helix Configuration +gobblin.cluster.helix.cluster.name=${gobblin.yarn.app.name} +gobblin.cluster.helix.overwrite=false +helix.job.timeout.seconds=9223372036854774 +helix.task.timeout.seconds=9223372036854774 + +# Yarn Configuration +gobblin.yarn.app.name=${root.project.name}-${grid.name} +# No. of ms to wait between sending a SIGTERM and SIGKILL to a container +yarn.nodemanager.sleep-delay-before-sigkill.ms=30000 +gobblin.yarn.work.dir=/tmp/${root.project.name}/${job.name} +gobblin.yarn.logs.sink.root.dir=${logs.dir}/gobblin/${root.project.name}/logs + +# Gobblin Cluster +gobblin.cluster.specConsumer.path=${home.dir}/streaming/specs +gobblin.cluster.workflow.expirySeconds=9223372036854774 +gobblin.cluster.job.conf.path=./jobs \ No newline at end of file diff --git a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties new file mode 100644 index 0000000..d07f636 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties @@ -0,0 +1,21 @@ +# Misc. deployment/cluster specific variables +user.name=yourname +logs.user.name=yourname +root.project.name=gobblin-kafka-streaming-local +grid.name=cluster-name + + +# Cluster specific directory configurations +# home.dir and root.data.location should be unique per cluster deployment + +home.dir=/jobs/${user.name} +logs.dir=/jobs/${logs.user.name} +root.data.location=/data + +fs.uri=file:/// + +# Yarn Configuration +gobblin.yarn.app.queue=default + +# Gobblin Cluster +gobblin.yarn.app.name=${root.project.name}-${grid.name}-1 \ No newline at end of file diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index f2ecf9d..8a35fa2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -288,7 +288,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { client.close(); } } catch (IOException e) { - throw new RuntimeException("Exception closing kafkaConsumerClient"); + throw new RuntimeException("Exception closing kafkaConsumerClient", e); } } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index bd01a51..8ec6fed 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -530,7 +530,7 @@ public abstract class AbstractJobLauncher implements JobLauncher { } } catch (Throwable t) { jobState.setState(JobState.RunningState.FAILED); - String errMsg = "Failed to launch and run job " + jobId; + String errMsg = "Failed to launch and run job " + jobId + " due to" + t.getMessage(); LOG.error(errMsg + ": " + t, t); this.jobContext.getJobState().setJobFailureException(t); } finally { diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java index 1b72be9..d516c80 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java @@ -64,7 +64,7 @@ import org.apache.gobblin.util.FileListUtils; /** - * A utility class that periodically reads log files in a source log file directory for changes + * A utility service that periodically reads log files in a source log file directory for changes * since the last reads and appends the changes to destination log files with the same names as * the source log files in a destination log directory. The source and destination log files * can be on different {@link FileSystem}s. @@ -507,7 +507,7 @@ public class LogCopier extends AbstractScheduledService { } /** - * Copy changes for a single log file. + * Copy log files that have been rolled over. */ private void copyChangesOfLogFile(Path srcFile, Path destFile) throws IOException { LOGGER.info("Copying changes from {} to {}", srcFile.toString(), destFile.toString()); @@ -516,7 +516,6 @@ public class LogCopier extends AbstractScheduledService { return; } - // We need to use fsDataInputStream in the finally clause so it has to be defined outside try-catch-finally FSDataInputStream fsDataInputStream = null; try (Closer closer = Closer.create()) { diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java index d6da93e..2a0410e 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java @@ -53,7 +53,7 @@ import org.apache.gobblin.util.ExecutorsUtils; /** * <p> * The super class for key management - * This class uses a scheduled task to do re-login to refetch token on a + * This class uses a scheduled task to do re-login to re-fetch token on a * configurable schedule. It also uses a second scheduled task * to renew the delegation token after each login. Both the re-login interval and the token * renewing interval are configurable. diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index e0a66e8..6770510 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -656,6 +656,10 @@ public class GobblinYarnAppLauncher { private void addLibJars(Path srcLibJarDir, Optional<Map<String, LocalResource>> resourceMap, Path destDir) throws IOException { FileSystem localFs = FileSystem.getLocal(this.yarnConfiguration); + if (! this.fs.exists(srcLibJarDir)) { + throw new IllegalStateException(String.format("The library directory[%s] are not being found, abort the application", srcLibJarDir)); + } + FileStatus[] libJarFiles = localFs.listStatus(srcLibJarDir); if (libJarFiles == null || libJarFiles.length == 0) { return; @@ -687,9 +691,13 @@ public class GobblinYarnAppLauncher { for (String localFilePath : SPLITTER.split(localFilePathList)) { Path srcFilePath = new Path(localFilePath); Path destFilePath = new Path(destDir, srcFilePath.getName()); - this.fs.copyFromLocalFile(srcFilePath, destFilePath); - if (resourceMap.isPresent()) { - YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + if (fs.exists(srcFilePath)) { + this.fs.copyFromLocalFile(srcFilePath, destFilePath); + if (resourceMap.isPresent()) { + YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); + } + } else { + LOGGER.warn(String.format("The request file %s doesn't exist", srcFilePath)); } } } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 224bad8..16c82a5 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -88,6 +88,7 @@ public class GobblinYarnConfigurationKeys { public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes"; public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES = Long.MAX_VALUE; // Resource/dependencies configuration properties. + // Missing this configuration should throw fatal exceptions to avoid harder-to-debug situation from Yarn container side. public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX + "lib.jars.dir"; public static final String LIB_JARS_DIR_NAME = "_libjars"; diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java index 21858e0..64127ac 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java @@ -41,11 +41,6 @@ import org.apache.gobblin.util.logs.LogCopier; /** * A base class for container processes that are sources of Gobblin Yarn application logs. * - * <p> - * The source log files are supposed to be on the local {@link FileSystem} and will - * be copied to a given destination {@link FileSystem}, which is typically HDFS. - * </p> - * * @author Yinan Li */ class GobblinYarnLogSource { @@ -67,6 +62,7 @@ class GobblinYarnLogSource { /** * Build a {@link LogCopier} instance used to copy the logs out from this {@link GobblinYarnLogSource}. + * TODO: This is duplicated to the org.apache.gobblin.yarn.GobblinYarnAppLauncher#buildLogCopier(com.typesafe.config.Config, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path) * * @param config the {@link Config} use to create the {@link LogCopier} * @param containerId the {@link ContainerId} of the container the {@link LogCopier} runs in diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index 5be2a4b..0742f54 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -196,6 +196,8 @@ public class YarnAutoScalingManager extends AbstractIdleService { // adjust the number of target containers based on the configured min and max container values. numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers)); + log.info("There are {} containers being requested", numTargetContainers); + this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances); } } diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 9c37748..0d2f3a5 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -116,7 +116,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase { private final Closer closer = Closer.create(); - private static void setEnv(String key, String value) { + public static void setEnv(String key, String value) { try { Map<String, String> env = System.getenv(); Class<?> cl = env.getClass(); @@ -250,6 +250,10 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase { */ @Test(enabled=false, groups = { "disabledOnTravis" }, dependsOnMethods = "testCreateHelixCluster") public void testSetupAndSubmitApplication() throws Exception { + HelixUtils.createGobblinHelixCluster( + this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY), + this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY)); + this.gobblinYarnAppLauncher.startYarnClient(); this.applicationId = this.gobblinYarnAppLauncher.setupAndSubmitApplication();