Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2f7694769 -> 01302a6db


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index ad4180a..2b4332f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.gobblin.service.modules.core;
 
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.Schedule;
 import java.io.File;
 import java.util.Map;
 import java.util.Properties;
-
 import java.util.UUID;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.Path;
@@ -49,17 +47,22 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
-
+@Test
 public class GobblinServiceHATest {
 
   private static final Logger logger = 
LoggerFactory.getLogger(GobblinServiceHATest.class);
   private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
 
+  private static final String QUARTZ_INSTANCE_NAME = 
"org.quartz.scheduler.instanceName";
+  private static final String QUARTZ_THREAD_POOL_COUNT = 
"org.quartz.threadPool.threadCount";
+
   private static final String COMMON_SPEC_STORE_PARENT_DIR = 
"/tmp/serviceCoreCommon/";
 
   private static final String NODE_1_SERVICE_WORK_DIR = 
"/tmp/serviceWorkDirNode1/";
@@ -147,19 +150,23 @@ public class GobblinServiceHATest {
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
     
node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, 
NODE_1_TOPOLOGY_SPEC_STORE_DIR);
     node1ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, 
NODE_1_FLOW_SPEC_STORE_DIR);
+    node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler1");
+    node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
     Properties node2ServiceCoreProperties = new Properties();
     node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
     
node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, 
NODE_2_TOPOLOGY_SPEC_STORE_DIR);
     node2ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, 
NODE_2_FLOW_SPEC_STORE_DIR);
+    node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler2");
+    node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
     // Start Node 1
-    this.node1GobblinServiceManager = new GobblinServiceManager("CoreService", 
"1",
+    this.node1GobblinServiceManager = new 
GobblinServiceManager("CoreService1", "1",
         ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), 
Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = new GobblinServiceManager("CoreService", 
"1",
+    this.node2GobblinServiceManager = new 
GobblinServiceManager("CoreService2", "2",
         ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), 
Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
     this.node2GobblinServiceManager.start();
 
@@ -183,6 +190,7 @@ public class GobblinServiceHATest {
   public void cleanUp() throws Exception {
     // Shutdown Node 1
     try {
+      logger.info("+++++++++++++++++++ start shutdown noad1");
       this.node1GobblinServiceManager.stop();
     } catch (Exception e) {
       logger.warn("Could not cleanly stop Node 1 of Gobblin Service", e);
@@ -190,6 +198,7 @@ public class GobblinServiceHATest {
 
     // Shutdown Node 2
     try {
+      logger.info("+++++++++++++++++++ start shutdown noad2");
       this.node2GobblinServiceManager.stop();
     } catch (Exception e) {
       logger.warn("Could not cleanly stop Node 2 of Gobblin Service", e);
@@ -231,6 +240,7 @@ public class GobblinServiceHATest {
 
   @Test
   public void testCreate() throws Exception {
+    logger.info("+++++++++++++++++++ testCreate START");
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1");
     flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
TEST_SOURCE_NAME);
@@ -256,8 +266,10 @@ public class GobblinServiceHATest {
     GobblinServiceManager master;
     if (this.node1GobblinServiceManager.isLeader()) {
       master = this.node1GobblinServiceManager;
+      logger.info("#### node 1 is manager");
     } else if (this.node2GobblinServiceManager.isLeader()) {
       master = this.node2GobblinServiceManager;
+      logger.info("#### node 2 is manager");
     } else {
       Assert.fail("No leader found in service cluster");
       return;
@@ -265,6 +277,12 @@ public class GobblinServiceHATest {
 
     int attempt = 0;
     boolean assertSuccess = false;
+
+    // Below while-loop will read all flow specs, but some of them are being 
persisted.
+    // We have seen CRC file java.io.EOFException when reading and writing at 
the same time.
+    // Wait for a few seconds to guarantee all the flow specs are persisted.
+    Thread.sleep(3000);
+
     while (attempt < 800) {
       int masterJobs = master.flowCatalog.getSpecs().size();
       if (masterJobs == 2) {
@@ -278,10 +296,13 @@ public class GobblinServiceHATest {
     logger.info("Total scheduling time in ms: " + (schedulingEndTime - 
schedulingStartTime));
 
     Assert.assertTrue(assertSuccess, "Flow that was created is not reflecting 
in FlowCatalog");
+    logger.info("+++++++++++++++++++ testCreate END");
   }
 
+
   @Test (dependsOnMethods = "testCreate")
   public void testCreateAgain() throws Exception {
+    logger.info("+++++++++++++++++++ testCreateAgain START");
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1");
     flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
TEST_SOURCE_NAME);
@@ -310,10 +331,13 @@ public class GobblinServiceHATest {
     } catch (RestLiResponseException e) {
       Assert.fail("Create Again should pass without complaining that the spec 
already exists.");
     }
+
+    logger.info("+++++++++++++++++++ testCreateAgain END");
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
   public void testGet() throws Exception {
+    logger.info("+++++++++++++++++++ testGet START");
     FlowId flowId1 = new 
FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1);
 
     FlowConfig flowConfig1 = this.node1FlowConfigClient.getFlowConfig(flowId1);
@@ -331,10 +355,13 @@ public class GobblinServiceHATest {
     Assert.assertEquals(flowConfig1.getTemplateUris(), TEST_TEMPLATE_URI_1);
     Assert.assertTrue(flowConfig1.getSchedule().isRunImmediately());
     Assert.assertEquals(flowConfig1.getProperties().get("param1"), "value1");
+
+    logger.info("+++++++++++++++++++ testGet END");
   }
 
   @Test (dependsOnMethods = "testGet")
   public void testUpdate() throws Exception {
+    logger.info("+++++++++++++++++++ testUpdate START");
     // Update on one node and retrieve from another
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1);
 
@@ -360,10 +387,13 @@ public class GobblinServiceHATest {
     Assert.assertFalse(retrievedFlowConfig.getSchedule().isRunImmediately());
     Assert.assertEquals(retrievedFlowConfig.getProperties().get("param1"), 
"value1b");
     Assert.assertEquals(retrievedFlowConfig.getProperties().get("param2"), 
"value2b");
+
+    logger.info("+++++++++++++++++++ testUpdate END");
   }
 
   @Test (dependsOnMethods = "testUpdate")
   public void testDelete() throws Exception {
+    logger.info("+++++++++++++++++++ testDelete START");
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME_1).setFlowName(TEST_FLOW_NAME_1);
 
     // make sure flow config exists
@@ -387,10 +417,13 @@ public class GobblinServiceHATest {
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
+
+    logger.info("+++++++++++++++++++ testDelete END");
   }
 
   @Test (dependsOnMethods = "testDelete")
   public void testBadGet() throws Exception {
+    logger.info("+++++++++++++++++++ testBadGet START");
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME_1).setFlowName(TEST_DUMMY_FLOW_NAME_1);
 
     try {
@@ -406,10 +439,13 @@ public class GobblinServiceHATest {
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
+
+    logger.info("+++++++++++++++++++ testBadGet END");
   }
 
   @Test (dependsOnMethods = "testBadGet")
   public void testBadDelete() throws Exception {
+    logger.info("+++++++++++++++++++ testBadDelete START");
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME_1).setFlowName(TEST_DUMMY_FLOW_NAME_1);
 
     try {
@@ -425,10 +461,13 @@ public class GobblinServiceHATest {
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
+
+    logger.info("+++++++++++++++++++ testBadDelete END");
   }
 
   @Test (dependsOnMethods = "testBadDelete")
   public void testBadUpdate() throws Exception {
+    logger.info("+++++++++++++++++++ testBadUpdate START");
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1b");
     flowProperties.put("param2", "value2b");
@@ -449,10 +488,13 @@ public class GobblinServiceHATest {
     } catch (RestLiResponseException e) {
       Assert.fail("Bad update should pass without complaining that the spec 
does not exists.");
     }
+
+    logger.info("+++++++++++++++++++ testBadUpdate END");
   }
 
   @Test (dependsOnMethods = "testBadUpdate")
   public void testKillNode() throws Exception {
+    logger.info("+++++++++++++++++++ testKillNode START");
     GobblinServiceManager master, secondary;
     if (this.node1GobblinServiceManager.isLeader()) {
       master = this.node1GobblinServiceManager;
@@ -500,5 +542,7 @@ public class GobblinServiceHATest {
     logger.info("Total failover time in ms: " + (failOverEndTime - 
failOverStartTime));
 
     Assert.assertTrue(assertSuccess, "New master should take over all old 
master jobs.");
+
+    logger.info("+++++++++++++++++++ testKillNode END");
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java
new file mode 100644
index 0000000..769137f
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/restli/FlowConfigUtilsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.linkedin.data.template.RequiredFieldNotPresentException;
+import com.linkedin.data.template.StringMap;
+
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
+
+
+@Test
+public class FlowConfigUtilsTest {
+  private void testFlowSpec(FlowConfig flowConfig) {
+    try {
+      FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
+    } catch (FlowConfigLoggedException e) {
+      Assert.fail("Should not get to here");
+    }
+  }
+
+  private void testSerDer(FlowConfig flowConfig) {
+    try {
+      String serialized = FlowConfigUtils.serializeFlowConfig(flowConfig);
+      FlowConfig newFlowConfig = 
FlowConfigUtils.deserializeFlowConfig(serialized);
+      Assert.assertTrue(testEqual(flowConfig, newFlowConfig));
+    } catch (IOException e) {
+      Assert.fail("Should not get to here");
+    }
+  }
+
+  /**
+   * Due to default value setting, flow config after deserialization might 
contain default value.
+   * Only check f1.equals(f2) is not enough
+   */
+  private boolean testEqual(FlowConfig f1, FlowConfig f2) {
+    if (f1.equals(f2)) {
+      return true;
+    }
+
+    // Check Id
+    Assert.assertTrue(f1.hasId() == f2.hasId());
+    Assert.assertTrue(f1.getId().equals(f2.getId()));
+
+    // Check Schedule
+    Assert.assertTrue(f1.hasSchedule() == f2.hasSchedule());
+    if (f1.hasSchedule()) {
+       Schedule s1 = f1.getSchedule();
+       Schedule s2 = f2.getSchedule();
+       Assert.assertTrue(s1.getCronSchedule().equals(s2.getCronSchedule()));
+       Assert.assertTrue(s1.isRunImmediately().equals(s2.isRunImmediately()));
+    }
+
+    // Check Template URI
+    Assert.assertTrue(f1.hasTemplateUris() == f2.hasTemplateUris());
+    if (f1.hasTemplateUris()) {
+      Assert.assertTrue(f1.getTemplateUris().equals(f2.getTemplateUris()));
+    }
+
+    // Check Properties
+    Assert.assertTrue(f1.hasProperties() == f2.hasProperties());
+    if (f1.hasProperties()) {
+      Assert.assertTrue(f1.getProperties().equals(f2.getProperties()));
+    }
+
+    return true;
+  }
+
+  public void testFullFlowConfig() {
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId()
+        .setFlowName("SN_CRMSYNC")
+        .setFlowGroup("DYNAMICS-USER-123456789"));
+    flowConfig.setSchedule(new Schedule()
+        .setCronSchedule("0 58 2/12 ? * * *")
+        .setRunImmediately(Boolean.valueOf("true")));
+
+    flowConfig.setTemplateUris("FS:///my.template");
+    Properties properties = new Properties();
+    properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm");
+    properties.put("gobblin.flow.destinationIdentifier", "espresso");
+    flowConfig.setProperties(new StringMap(Maps.fromProperties(properties)));
+
+    testFlowSpec(flowConfig);
+    testSerDer(flowConfig);
+  }
+
+  public void testFlowConfigWithoutSchedule() {
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId()
+        .setFlowName("SN_CRMSYNC")
+        .setFlowGroup("DYNAMICS-USER-123456789"));
+
+    flowConfig.setTemplateUris("FS:///my.template");
+    Properties properties = new Properties();
+    properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm");
+    properties.put("gobblin.flow.destinationIdentifier", "espresso");
+    flowConfig.setProperties(new StringMap(Maps.fromProperties(properties)));
+
+    testFlowSpec(flowConfig);
+    testSerDer(flowConfig);
+  }
+
+  public void testFlowConfigWithDefaultRunImmediately() {
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId()
+        .setFlowName("SN_CRMSYNC")
+        .setFlowGroup("DYNAMICS-USER-123456789"));
+    flowConfig.setSchedule(new Schedule()
+        .setCronSchedule("0 58 2/12 ? * * *"));
+
+    flowConfig.setTemplateUris("FS:///my.template");
+    Properties properties = new Properties();
+    properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm");
+    properties.put("gobblin.flow.destinationIdentifier", "espresso");
+    flowConfig.setProperties(new StringMap(Maps.fromProperties(properties)));
+
+    testFlowSpec(flowConfig);
+    testSerDer(flowConfig);
+  }
+
+  public void testFlowConfigWithoutTemplateUri() {
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId()
+        .setFlowName("SN_CRMSYNC")
+        .setFlowGroup("DYNAMICS-USER-123456789"));
+    flowConfig.setSchedule(new Schedule()
+        .setCronSchedule("0 58 2/12 ? * * *"));
+
+    Properties properties = new Properties();
+    properties.put("gobblin.flow.sourceIdentifier", "dynamicsCrm");
+    properties.put("gobblin.flow.destinationIdentifier", "espresso");
+    flowConfig.setProperties(new StringMap(Maps.fromProperties(properties)));
+
+    try {
+      FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
+      Assert.fail("Should not get to here");
+    } catch (RequiredFieldNotPresentException e) {
+      Assert.assertTrue(true, "templateUri cannot be empty");
+    }
+    testSerDer(flowConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-service/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/log4j.xml 
b/gobblin-service/src/test/resources/log4j.xml
new file mode 100644
index 0000000..b13377c
--- /dev/null
+++ b/gobblin-service/src/test/resources/log4j.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+        value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" />
+    </layout>
+  </appender>
+
+  <root>
+    <level value="info" />
+    <appender-ref ref="console" />
+  </root>
+
+  <!-- Swallow annoying exceptions when creating a configuration. -->
+  <logger name="org.apache.hadoop.conf.Configuration">
+    <level value="FATAL"/>
+  </logger>
+
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/01302a6d/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index ac7abcf..fb1e590 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -17,13 +17,16 @@
 
 package org.apache.gobblin.util;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 
@@ -79,4 +82,20 @@ public class PropertiesUtils {
 
     return extractedProperties;
   }
+
+  public static String serialize(Properties properties) throws IOException {
+    StringWriter outputWriter = new StringWriter();
+    properties.store(outputWriter, "");
+    String rst = outputWriter.toString();
+    outputWriter.close();
+    return rst;
+  }
+
+  public static Properties deserialize(String serialized) throws IOException {
+    StringReader reader = new StringReader(serialized);
+    Properties properties = new Properties();
+    properties.load(reader);
+    reader.close();
+    return properties;
+  }
 }

Reply via email to