Repository: incubator-gobblin Updated Branches: refs/heads/master b7b2bd9d1 -> 462ea8cf2
[GOBBLIN-329] Add a new basic cluster integration test Closes #2181 from HappyRay/add-cluster-basic- integration-test Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/462ea8cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/462ea8cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/462ea8cf Branch: refs/heads/master Commit: 462ea8cf214f22000b198f813979930b4d2dda4b Parents: b7b2bd9 Author: Ray Yang <[email protected]> Authored: Mon Dec 4 15:21:02 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Dec 4 15:21:02 2017 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 2 +- .../gobblin/cluster/GobblinClusterUtils.java | 6 +- .../gobblin/cluster/ClusterIntegrationTest.java | 220 +++++++++++++++++++ .../src/test/resources/BasicCluster.conf | 24 ++ .../src/test/resources/HelloWorldJob.conf | 45 ++++ 5 files changed, 293 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index ab6f8b4..653babf 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys { public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster"; public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode"; public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false; - public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir"; + public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir"; // Helix configuration properties. public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java index 3082720..3f53443 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java @@ -17,7 +17,7 @@ package org.apache.gobblin.cluster; -import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR; +import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; import com.typesafe.config.Config; import java.net.InetAddress; @@ -54,8 +54,8 @@ public class GobblinClusterUtils { public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs, String applicationName, String applicationId) { - if (config.hasPath(CLUSTRER_WORK_DIR)) { - return new Path(config.getString(CLUSTRER_WORK_DIR)); + if (config.hasPath(CLUSTER_WORK_DIR)) { + return new Path(config.getString(CLUSTER_WORK_DIR)); } return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId)); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java new file mode 100644 index 0000000..b54db1a --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.io.Resources; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.testing.AssertWithBackoff; + + +public class ClusterIntegrationTest { + + public final static Logger _logger = LoggerFactory.getLogger(ClusterIntegrationTest.class); + public static final String JOB_CONF_NAME = "HelloWorldJob.conf"; + Config _config; + private Path _workPath; + private Path _jobConfigPath; + private Path _jobOutputBasePath; + private URL _jobConfResourceUrl; + private TestingServer _testingZKServer; + private GobblinTaskRunner _worker; + private GobblinClusterManager _manager; + + + @Test + public void simpleJobShouldComplete() throws Exception { + init(); + startCluster(); + waitForAndVerifyOutputFiles(); + shutdownCluster(); + } + + private void init() throws Exception { + initWorkDir(); + initZooKeeper(); + initConfig(); + initJobConfDir(); + initJobOutputDir(); + } + + private void initWorkDir() throws IOException { + // Relative to the current directory + _workPath = Paths.get("gobblin-integration-test-work-dir"); + _logger.info("Created a new work directory: " + _workPath.toAbsolutePath()); + + // Delete the working directory in case the previous test fails to delete the directory + // e.g. when the test was killed forcefully under a debugger. + deleteWorkDir(); + Files.createDirectory(_workPath); + } + + private void initJobConfDir() throws IOException { + String jobConfigDir = _config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY); + _jobConfigPath = Paths.get(jobConfigDir); + Files.createDirectories(_jobConfigPath); + _jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME); + copyJobConfFromResource(); + } + + private void initJobOutputDir() throws IOException { + _jobOutputBasePath = Paths.get(_workPath + "/job-output"); + Files.createDirectory(_jobOutputBasePath); + } + + private void copyJobConfFromResource() throws IOException { + try (InputStream resourceStream = _jobConfResourceUrl.openStream()) { + File targetFile = new File(_jobConfigPath + "/" + JOB_CONF_NAME); + FileUtils.copyInputStreamToFile(resourceStream, targetFile); + } + } + + private void initZooKeeper() throws Exception { + _testingZKServer = new TestingServer(false); + _logger.info( + "Created testing ZK Server. Connection string : " + _testingZKServer.getConnectString()); + } + + private void initConfig() { + Config configFromResource = getConfigFromResource(); + Config configOverride = getConfigOverride(); + _config = configOverride.withFallback(configFromResource).resolve(); + } + + private Config getConfigOverride() { + Map<String, String> configMap = new HashMap<>(); + String zkConnectionString = _testingZKServer.getConnectString(); + configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString); + configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, _workPath.toString()); + Config config = ConfigFactory.parseMap(configMap); + return config; + } + + private Config getConfigFromResource() { + URL url = Resources.getResource("BasicCluster.conf"); + Config config = ConfigFactory.parseURL(url); + return config; + } + + @AfterMethod + public void tearDown() throws IOException { + deleteWorkDir(); + } + + private void deleteWorkDir() throws IOException { + if ((_workPath != null) && Files.exists(_workPath)) { + FileUtils.deleteDirectory(_workPath.toFile()); + } + } + + private void createHelixCluster() { + String zkConnectionString = _config + .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String helix_cluster_name = _config + .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name); + } + + private void startCluster() throws Exception { + _testingZKServer.start(); + createHelixCluster(); + startWorker(); + startManager(); + } + + private void startWorker() throws Exception { + _worker = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, "Worker", + TestHelper.TEST_APPLICATION_ID, "1", + _config, Optional.absent()); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(_worker::start); + workerThread.start(); + } + + private void startManager() throws Exception { + _manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, + TestHelper.TEST_APPLICATION_ID, + _config, Optional.absent()); + + _manager.start(); + } + + private void shutdownCluster() throws InterruptedException, IOException { + _worker.stop(); + _manager.stop(); + _testingZKServer.close(); + } + + private void waitForAndVerifyOutputFiles() throws Exception { + + AssertWithBackoff asserter = AssertWithBackoff.create().logger(_logger).timeoutMs(60_000) + .maxSleepMs(100).backoffFactor(1.5); + + asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion"); + } + + private boolean hasExpectedFilesBeenCreated(Void input) { + int numOfFiles = getNumOfOutputFiles(_jobOutputBasePath); + return numOfFiles == 1; + } + + private int getNumOfOutputFiles(Path jobOutputDir) { + Collection<File> outputFiles = FileUtils + .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true); + return outputFiles.size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/BasicCluster.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/BasicCluster.conf b/gobblin-cluster/src/test/resources/BasicCluster.conf new file mode 100644 index 0000000..a42b41d --- /dev/null +++ b/gobblin-cluster/src/test/resources/BasicCluster.conf @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Cluster / Helix configuration properties +gobblin.cluster.helix.cluster.name=BasicGobblinCluster +gobblin.cluster.workDir=/tmp/gobblinClusterBasicTest/ +gobblin.cluster.job.conf.path=${gobblin.cluster.workDir}/jobs +gobblin.cluster.standaloneMode=true +gobblin.cluster.job.executeInSchedulingThread=false +jobexecutor.threadpool.size=20 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/462ea8cf/gobblin-cluster/src/test/resources/HelloWorldJob.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/HelloWorldJob.conf b/gobblin-cluster/src/test/resources/HelloWorldJob.conf new file mode 100644 index 0000000..7db6b09 --- /dev/null +++ b/gobblin-cluster/src/test/resources/HelloWorldJob.conf @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Number of hellos to generate +gobblin.source.helloWorld.numHellos=1 + +# Job Identification +job.name=HelloWorldTestJob +job.group=HelloWorldGroup +job.description="The Hello World Gobblin job" + +source.class=org.apache.gobblin.util.test.HelloWorldSource + +# also works with local file system +writer.destination.type=HDFS +writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder +writer.output.format=txt +writer.staging.dir=${gobblin.cluster.workDir}/writer-staging +writer.output.dir=${gobblin.cluster.workDir}/writer-output + +# Need the converter since this writer accepts bytes only. +converter.classes=org.apache.gobblin.converter.string.StringToBytesConverter + +data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher +data.publisher.final.dir=${gobblin.cluster.workDir}/job-output +data.publisher.replace.final.dir=false + +state.store.enabled=false + +# Miscellaneous +job.lock.enabled=false
