Repository: incubator-gobblin Updated Branches: refs/heads/master 2f7694769 -> 01302a6db
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java index ad4180a..2b4332f 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java @@ -16,13 +16,11 @@ */ package org.apache.gobblin.service.modules.core; -import org.apache.gobblin.service.FlowId; -import org.apache.gobblin.service.Schedule; import java.io.File; import java.util.Map; import java.util.Properties; - import java.util.UUID; + import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.Path; @@ -49,17 +47,22 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; -import org.apache.gobblin.service.modules.utils.HelixUtils; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.util.ConfigUtils; - +@Test public class GobblinServiceHATest { private static final Logger logger = LoggerFactory.getLogger(GobblinServiceHATest.class); private static Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private static final String QUARTZ_INSTANCE_NAME = "org.quartz.scheduler.instanceName"; + private static final String QUARTZ_THREAD_POOL_COUNT = "org.quartz.threadPool.threadCount"; + private static final String COMMON_SPEC_STORE_PARENT_DIR = "/tmp/serviceCoreCommon/"; private static final String NODE_1_SERVICE_WORK_DIR = "/tmp/serviceWorkDirNode1/"; @@ -147,19 +150,23 @@ public class GobblinServiceHATest { node1ServiceCoreProperties.putAll(commonServiceCoreProperties); node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_1_TOPOLOGY_SPEC_STORE_DIR); node1ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_1_FLOW_SPEC_STORE_DIR); + node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler1"); + node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3); Properties node2ServiceCoreProperties = new Properties(); node2ServiceCoreProperties.putAll(commonServiceCoreProperties); node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, NODE_2_TOPOLOGY_SPEC_STORE_DIR); node2ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, NODE_2_FLOW_SPEC_STORE_DIR); + node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler2"); + node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3); // Start Node 1 - this.node1GobblinServiceManager = new GobblinServiceManager("CoreService", "1", + this.node1GobblinServiceManager = new GobblinServiceManager("CoreService1", "1", ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR))); this.node1GobblinServiceManager.start(); // Start Node 2 - this.node2GobblinServiceManager = new GobblinServiceManager("CoreService", "1", + this.node2GobblinServiceManager = new GobblinServiceManager("CoreService2", "2", ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR))); this.node2GobblinServiceManager.start(); @@ -183,6 +190,7 @@ public class GobblinServiceHATest { public void cleanUp() throws Exception { // Shutdown Node 1 try { + logger.info("+++++++++++++++++++ start shutdown noad1"); this.node1GobblinServiceManager.stop(); } catch (Exception e) { logger.warn("Could not cleanly stop Node 1 of Gobblin Service", e); @@ -190,6 +198,7 @@ public class GobblinServiceHATest { // Shutdown Node 2 try { + logger.info("+++++++++++++++++++ start shutdown noad2"); this.node2GobblinServiceManager.stop(); } catch (Exception e) { logger.warn("Could not cleanly stop Node 2 of Gobblin Service", e); @@ -231,6 +240,7 @@ public class GobblinServiceHATest { @Test public void testCreate() throws Exception { + logger.info("+++++++++++++++++++ testCreate START"); Map<String, String> flowProperties = Maps.newHashMap(); flowProperties.put("param1", "value1"); flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); @@ -256,8 +266,10 @@ public class GobblinServiceHATest { GobblinServiceManager master; if (this.node1GobblinServiceManager.isLeader()) { master = this.node1GobblinServiceManager; + logger.info("#### node 1 is manager"); } else if (this.node2GobblinServiceManager.isLeader()) { master = this.node2GobblinServiceManager; + logger.info("#### node 2 is manager"); } else { Assert.fail("No leader found in service cluster"); return; @@ -265,6 +277,12 @@ public class GobblinServiceHATest { int attempt = 0; boolean assertSuccess = false; + + // Below while-loop will read all flow specs, but some of them are being persisted. + // We have seen CRC file java.io.EOFException when reading and writing at the same time. + // Wait for a few seconds to guarantee all the flow specs are persisted. + Thread.sleep(3000); + while (attempt < 800) { int masterJobs = master.flowCatalog.getSpecs().size(); if (masterJobs == 2) { @@ -278,10 +296,13 @@ public class GobblinServiceHATest { logger.info("Total scheduling time in ms: " + (schedulingEndTime - schedulingStartTime)); Assert.assertTrue(assertSuccess, "Flow that was created is not reflecting in FlowCatalog"); + logger.info("+++++++++++++++++++ testCreate END"); } + @Test (dependsOnMethods = "testCreate") public void testCreateAgain() throws Exception { + logger.info("+++++++++++++++++++ testCreateAgain START"); Map<String, String> flowProperties = Maps.newHashMap(); flowProperties.put("param1", "value1"); flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); @@ -310,10 +331,13 @@ public class GobblinServiceHATest { } catch (RestLiResponseException e) { Assert.fail("Create Again should pass without complaining that the spec already exists."); } + + logger.info("+++++++++++++++++++ testCreateAgain END"); } @Test (dependsOnMethods = "testCreateAgain") public void testGet() throws Exception { + logger.info("+++++++++++++++++++ testGet START"); FlowId flowId1 = new FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1); FlowConfig flowConfig1 = this.node1FlowConfigClient.getFlowConfig(flowId1); @@ -331,10 +355,13 @@ public class GobblinServiceHATest { Assert.assertEquals(flowConfig1.getTemplateUris(), TEST_TEMPLATE_URI_1); Assert.assertTrue(flowConfig1.getSchedule().isRunImmediately()); Assert.assertEquals(flowConfig1.getProperties().get("param1"), "value1"); + + logger.info("+++++++++++++++++++ testGet END"); } @Test (dependsOnMethods = "testGet") public void testUpdate() throws Exception { + logger.info("+++++++++++++++++++ testUpdate START"); // Update on one node and retrieve from another FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1); @@ -360,10 +387,13 @@ public class GobblinServiceHATest { Assert.assertFalse(retrievedFlowConfig.getSchedule().isRunImmediately()); Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), "value1b"); Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), "value2b"); + + logger.info("+++++++++++++++++++ testUpdate END"); } @Test (dependsOnMethods = "testUpdate") public void testDelete() throws Exception { + logger.info("+++++++++++++++++++ testDelete START"); FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1); // make sure flow config exists @@ -387,10 +417,13 @@ public class GobblinServiceHATest { } catch (RestLiResponseException e) { Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); } + + logger.info("+++++++++++++++++++ testDelete END"); } @Test (dependsOnMethods = "testDelete") public void testBadGet() throws Exception { + logger.info("+++++++++++++++++++ testBadGet START"); FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME_1).setFlowName(TEST_DUMMY_FLOW_NAME_1); try { @@ -406,10 +439,13 @@ public class GobblinServiceHATest { } catch (RestLiResponseException e) { Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); } + + logger.info("+++++++++++++++++++ testBadGet END"); } @Test (dependsOnMethods = "testBadGet") public void testBadDelete() throws Exception { + logger.info("+++++++++++++++++++ testBadDelete START"); FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME_1).setFlowName(TEST_DUMMY_FLOW_NAME_1); try { @@ -425,10 +461,13 @@ public class GobblinServiceHATest { } catch (RestLiResponseException e) { Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); } + + logger.info("+++++++++++++++++++ testBadDelete END"); } @Test (dependsOnMethods = "testBadDelete") public void testBadUpdate() throws Exception { + logger.info("+++++++++++++++++++ testBadUpdate START"); Map<String, String> flowProperties = Maps.newHashMap(); flowProperties.put("param1", "value1b"); flowProperties.put("param2", "value2b"); @@ -449,10 +488,13 @@ public class GobblinServiceHATest { } catch (RestLiResponseException e) { Assert.fail("Bad update should pass without complaining that the spec does not exists."); } + + logger.info("+++++++++++++++++++ testBadUpdate END"); } @Test (dependsOnMethods = "testBadUpdate") public void testKillNode() throws Exception { + logger.info("+++++++++++++++++++ testKillNode START"); GobblinServiceManager master, secondary; if (this.node1GobblinServiceManager.isLeader()) { master = this.node1GobblinServiceManager; @@ -500,5 +542,7 @@ public class GobblinServiceHATest { logger.info("Total failover time in ms: " + (failOverEndTime - failOverStartTime)); Assert.assertTrue(assertSuccess, "New master should take over all old master jobs."); + + logger.info("+++++++++++++++++++ testKillNode END"); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java new file mode 100644 index 0000000..769137f --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.restli; + +import java.io.IOException; +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; +import com.linkedin.data.template.RequiredFieldNotPresentException; +import com.linkedin.data.template.StringMap; + +import org.apache.gobblin.service.FlowConfig; +import org.apache.gobblin.service.FlowConfigLoggedException; +import org.apache.gobblin.service.FlowConfigResourceLocalHandler; +import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.Schedule; + + +@Test +public class FlowConfigUtilsTest { + private void testFlowSpec(FlowConfig flowConfig) { + try { + FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + } catch (FlowConfigLoggedException e) { + Assert.fail("Should not get to here"); + } + } + + private void testSerDer(FlowConfig flowConfig) { + try { + String serialized = FlowConfigUtils.serializeFlowConfig(flowConfig); + FlowConfig newFlowConfig = FlowConfigUtils.deserializeFlowConfig(serialized); + Assert.assertTrue(testEqual(flowConfig, newFlowConfig)); + } catch (IOException e) { + Assert.fail("Should not get to here"); + } + } + + /** + * Due to default value setting, flow config after deserialization might contain default value. + * Only check f1.equals(f2) is not enough + */ + private boolean testEqual(FlowConfig f1, FlowConfig f2) { + if (f1.equals(f2)) { + return true; + } + + // Check Id + Assert.assertTrue(f1.hasId() == f2.hasId()); + Assert.assertTrue(f1.getId().equals(f2.getId())); + + // Check Schedule + Assert.assertTrue(f1.hasSchedule() == f2.hasSchedule()); + if (f1.hasSchedule()) { + Schedule s1 = f1.getSchedule(); + Schedule s2 = f2.getSchedule(); + Assert.assertTrue(s1.getCronSchedule().equals(s2.getCronSchedule())); + Assert.assertTrue(s1.isRunImmediately().equals(s2.isRunImmediately())); + } + + // Check Template URI + Assert.assertTrue(f1.hasTemplateUris() == f2.hasTemplateUris()); + if (f1.hasTemplateUris()) { + Assert.assertTrue(f1.getTemplateUris().equals(f2.getTemplateUris())); + } + + // Check Properties + Assert.assertTrue(f1.hasProperties() == f2.hasProperties()); + if (f1.hasProperties()) { + Assert.assertTrue(f1.getProperties().equals(f2.getProperties())); + } + + return true; + } + + public void testFullFlowConfig() { + FlowConfig flowConfig = new FlowConfig().setId(new FlowId() + .setFlowName("SN_CRMSYNC") + .setFlowGroup("DYNAMICS-USER-123456789")); + flowConfig.setSchedule(new Schedule() + .setCronSchedule("0 58 2/12 ? * * *") + .setRunImmediately(Boolean.valueOf("true"))); + + flowConfig.setTemplateUris("FS:///my.template"); + Properties properties = new Properties(); + properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm"); + properties.put("gobblin.flow.destinationIdentifier", "espresso"); + flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); + + testFlowSpec(flowConfig); + testSerDer(flowConfig); + } + + public void testFlowConfigWithoutSchedule() { + FlowConfig flowConfig = new FlowConfig().setId(new FlowId() + .setFlowName("SN_CRMSYNC") + .setFlowGroup("DYNAMICS-USER-123456789")); + + flowConfig.setTemplateUris("FS:///my.template"); + Properties properties = new Properties(); + properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm"); + properties.put("gobblin.flow.destinationIdentifier", "espresso"); + flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); + + testFlowSpec(flowConfig); + testSerDer(flowConfig); + } + + public void testFlowConfigWithDefaultRunImmediately() { + FlowConfig flowConfig = new FlowConfig().setId(new FlowId() + .setFlowName("SN_CRMSYNC") + .setFlowGroup("DYNAMICS-USER-123456789")); + flowConfig.setSchedule(new Schedule() + .setCronSchedule("0 58 2/12 ? * * *")); + + flowConfig.setTemplateUris("FS:///my.template"); + Properties properties = new Properties(); + properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm"); + properties.put("gobblin.flow.destinationIdentifier", "espresso"); + flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); + + testFlowSpec(flowConfig); + testSerDer(flowConfig); + } + + public void testFlowConfigWithoutTemplateUri() { + FlowConfig flowConfig = new FlowConfig().setId(new FlowId() + .setFlowName("SN_CRMSYNC") + .setFlowGroup("DYNAMICS-USER-123456789")); + flowConfig.setSchedule(new Schedule() + .setCronSchedule("0 58 2/12 ? * * *")); + + Properties properties = new Properties(); + properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm"); + properties.put("gobblin.flow.destinationIdentifier", "espresso"); + flowConfig.setProperties(new StringMap(Maps.fromProperties(properties))); + + try { + FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + Assert.fail("Should not get to here"); + } catch (RequiredFieldNotPresentException e) { + Assert.assertTrue(true, "templateUri cannot be empty"); + } + testSerDer(flowConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/log4j.xml b/gobblin-service/src/test/resources/log4j.xml new file mode 100644 index 0000000..b13377c --- /dev/null +++ b/gobblin-service/src/test/resources/log4j.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" + value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" /> + </layout> + </appender> + + <root> + <level value="info" /> + <appender-ref ref="console" /> + </root> + + <!-- Swallow annoying exceptions when creating a configuration. --> + <logger name="org.apache.hadoop.conf.Configuration"> + <level value="FATAL"/> + </logger> + +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index ac7abcf..fb1e590 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -17,13 +17,16 @@ package org.apache.gobblin.util; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; import java.util.Map; import java.util.Properties; import org.apache.commons.lang3.StringUtils; -import com.google.common.base.Preconditions; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -79,4 +82,20 @@ public class PropertiesUtils { return extractedProperties; } + + public static String serialize(Properties properties) throws IOException { + StringWriter outputWriter = new StringWriter(); + properties.store(outputWriter, ""); + String rst = outputWriter.toString(); + outputWriter.close(); + return rst; + } + + public static Properties deserialize(String serialized) throws IOException { + StringReader reader = new StringReader(serialized); + Properties properties = new Properties(); + properties.load(reader); + reader.close(); + return properties; + } }
