http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
new file mode 100755
index 0000000..68740e4
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.wink.json4j.JSONException;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.Stopwatch;
+import org.junit.runner.Description;
+
+import com.google.common.io.Files;
+
+public class TimerTestBase {
+       static final String logFilePath = "/tmp/odf-test-execution-log.csv";
+       static Map<String, HashMap<String, Long>> testTimeMap = new 
HashMap<String, HashMap<String, Long>>();
+       final static Logger logger = ODFTestLogger.get();
+
+       @Rule
+       public Stopwatch timeWatcher = new Stopwatch() {
+               @Override
+               protected void finished(long nanos, Description description) {
+                       HashMap<String, Long> testMap = 
testTimeMap.get(description.getClassName());
+                       if (testMap == null) {
+                               testMap = new HashMap<String, Long>();
+                               testTimeMap.put(description.getClassName(), 
testMap);
+                       }
+                       testMap.put(description.getMethodName(), (nanos / 1000 
/ 1000));
+               }
+       };
+
+       @AfterClass
+       public static void tearDownAndLogTimes() throws JSONException {
+               try {
+                       File logFile = new File(logFilePath);
+                       Set<String> uniqueRows = new HashSet<String>();
+                       if (logFile.exists()) {
+                               uniqueRows = new 
HashSet<String>(Files.readLines(logFile, StandardCharsets.UTF_8));
+                       }
+
+                       for (Entry<String, HashMap<String, Long>> entry : 
testTimeMap.entrySet()) {
+                               for (Entry<String, Long> testEntry : 
entry.getValue().entrySet()) {
+                                       String logRow = new 
StringBuilder().append(testEntry.getKey()).append(",").append(testEntry.getValue()).append(",").append(entry.getKey()).append(",")
+                                                       
.append(System.getProperty("odf.build.project.name", 
"ProjectNameNotDefined")).toString();
+                                       uniqueRows.add(logRow);
+                               }
+                       }
+
+                       StringBuilder logContent = new StringBuilder();
+                       Iterator<String> rowIterator = uniqueRows.iterator();
+                       while (rowIterator.hasNext()) {
+                               logContent.append(rowIterator.next());
+                               if (rowIterator.hasNext()) {
+                                       logContent.append("\n");
+                               }
+                       }
+
+                       logger.info("Total time consumed by succeeded tests:\n" 
+ logContent.toString());
+                       logFile.createNewFile();
+                       Files.write(logContent.toString().getBytes("UTF-8"), 
logFile);
+               } catch (IOException e) {
+                       logger.warning("Error writing test execution log");
+                       e.printStackTrace();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
new file mode 100755
index 0000000..7a1f0ed
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.AnnotationDeserializer;
+import org.apache.atlas.odf.json.AnnotationSerializer;
+
+public class AnnotationExtensionTest extends TimerTestBase {
+
+       static Logger logger = ODFTestLogger.get();
+
+       public static <T> T readJSONObjectFromFileInClasspath(ObjectMapper om, 
Class<T> cl, String pathToFile, ClassLoader classLoader) {
+               if (classLoader == null) {
+                       // use current classloader if not provided
+                       classLoader = 
AnnotationExtensionTest.class.getClassLoader();
+               }
+               InputStream is = classLoader.getResourceAsStream(pathToFile);
+               T result = null;
+               try {
+                       result = om.readValue(is, cl);
+               } catch (IOException e) {
+                       // assume that this is a severe error since the 
provided JSONs should be correct
+                       throw new RuntimeException(e);
+               }
+
+               return result;
+       }
+
+       @Test
+       public void testWithUtils() throws Exception {
+               testSimple(JSONUtils.getGlobalObjectMapper());
+       }
+
+       @Test
+       public void testWithSeparateObjectMapper() throws Exception {
+               ObjectMapper om = new ObjectMapper();
+               SimpleModule mod = new SimpleModule("annotation module", 
Version.unknownVersion());
+               mod.addDeserializer(Annotation.class, new 
AnnotationDeserializer());
+               mod.addSerializer(Annotation.class, new AnnotationSerializer());
+               om.registerModule(mod);
+               testSimple(om);
+       }
+
+       private void testSimple(ObjectMapper om) throws Exception {
+               ExtensionTestAnnotation newTestAnnot = new 
ExtensionTestAnnotation();
+               String strValue = "newstring1";
+               int intValue = 4237;
+               newTestAnnot.setNewStringProp1(strValue);
+               newTestAnnot.setNewIntProp2(intValue);
+//             String newTestAnnotJSON = om.writeValueAsString(newTestAnnot);
+               String newTestAnnotJSON = 
JSONUtils.toJSON(newTestAnnot).toString();
+               logger.info("New test annot JSON: " + newTestAnnotJSON);
+
+               logger.info("Deserializing with " + 
Annotation.class.getSimpleName() + "class as target class");
+               Annotation annot1 = om.readValue(newTestAnnotJSON, 
Annotation.class);
+               Assert.assertNotNull(annot1);
+               logger.info("Deserialized annotation JSON (target: " + 
Annotation.class.getSimpleName() + "): " + om.writeValueAsString(annot1));
+               logger.info("Deserialized annotation class (target: " + 
Annotation.class.getSimpleName() + "): " + annot1.getClass().getName());
+               Assert.assertEquals(ExtensionTestAnnotation.class, 
annot1.getClass());
+               ExtensionTestAnnotation extAnnot1 = (ExtensionTestAnnotation) 
annot1;
+               Assert.assertEquals(strValue, extAnnot1.getNewStringProp1());
+               Assert.assertEquals(intValue, extAnnot1.getNewIntProp2());
+
+               /* This does not make sense as you would never enter 
ExtensionTestAnnotation.class as deserialization target
+                * which would enforce usage of the standard Bean serializer 
(since no serializer is registered for this specific class -> jsonProperties 
can not be mapped
+               logger.info("Calling deserialization with " + 
ExtensionTestAnnotation.class.getSimpleName() + " as target");
+               ExtensionTestAnnotation annot2 = om.readValue(newTestAnnotJSON, 
ExtensionTestAnnotation.class);
+               Assert.assertNotNull(annot2);
+               logger.info("Deserialized annotation JSON (target: " + 
ExtensionTestAnnotation.class.getSimpleName() + "): " + 
om.writeValueAsString(annot2));
+               logger.info("Deserialized annotation class (target: " + 
ExtensionTestAnnotation.class.getSimpleName() + "): " + 
annot2.getClass().getName());
+               Assert.assertEquals(ExtensionTestAnnotation.class, 
annot2.getClass());
+               String s = annot2.getNewStringProp1();
+               Assert.assertEquals(strValue, annot2.getNewStringProp1());
+               Assert.assertEquals(intValue, annot2.getNewIntProp2()); */
+
+               logger.info("Processing profiling annotation...");
+               Annotation unknownAnnot = readJSONObjectFromFileInClasspath(om, 
Annotation.class, 
"org/apache/atlas/odf/core/test/annotation/annotexttest1.json", null);
+               Assert.assertNotNull(unknownAnnot);
+               logger.info("Read Unknown annotation: " + 
unknownAnnot.getClass().getName());
+               Assert.assertEquals(ProfilingAnnotation.class, 
unknownAnnot.getClass());
+
+               logger.info("Read profiling annotation: " + 
om.writeValueAsString(unknownAnnot));
+               JSONObject jsonPropertiesObj = new 
JSONObject(unknownAnnot.getJsonProperties());
+               Assert.assertEquals("newProp1Value", 
jsonPropertiesObj.get("newProp1"));
+               Assert.assertEquals((Integer) 4237, 
jsonPropertiesObj.get("newProp2"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
new file mode 100755
index 0000000..b65ce17
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnnotationStoreTest extends ODFTestcase {
+
+       private AnnotationStore createAnnotationStore() {
+               return new DefaultStatusQueueStore();
+       }
+       
+       @Test
+       public void testStoreProfilingAnnotation() throws Exception {
+               AnnotationStore as = createAnnotationStore();
+               
+               String modRef1Id = UUID.randomUUID().toString();
+               MetaDataObjectReference mdoref1 = new MetaDataObjectReference();
+               mdoref1.setId(modRef1Id);
+               
+               ProfilingAnnotation annot1 = new ProfilingAnnotation();
+               annot1.setJsonProperties("{\"a\": \"b\"}");
+               annot1.setAnnotationType("AnnotType1");
+               annot1.setProfiledObject(mdoref1);
+
+               MetaDataObjectReference annot1Ref = as.store(annot1);
+               Assert.assertNotNull(annot1Ref.getId());
+               List<Annotation> retrievedAnnots = as.getAnnotations(mdoref1, 
null);
+               Assert.assertEquals(1, retrievedAnnots.size());
+               
+               Annotation retrievedAnnot = retrievedAnnots.get(0);
+               Assert.assertTrue(annot1 != retrievedAnnot);
+               Assert.assertTrue(retrievedAnnot instanceof 
ProfilingAnnotation);
+               ProfilingAnnotation retrievedProfilingAnnotation = 
(ProfilingAnnotation) retrievedAnnot;
+               Assert.assertEquals(modRef1Id, 
retrievedProfilingAnnotation.getProfiledObject().getId());
+               Assert.assertEquals(annot1Ref, retrievedAnnot.getReference());
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
new file mode 100755
index 0000000..cd8f695
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+
+class ExtensionTestAnnotation extends ProfilingAnnotation {
+
+       private String newStringProp1;
+       private int newIntProp2;
+
+       public String getNewStringProp1() {
+               return newStringProp1;
+       }
+
+       public void setNewStringProp1(String newStringProp1) {
+               this.newStringProp1 = newStringProp1;
+       }
+
+       public int getNewIntProp2() {
+               return newIntProp2;
+       }
+
+       public void setNewIntProp2(int newIntProp2) {
+               this.newIntProp2 = newIntProp2;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
new file mode 100755
index 0000000..f65e3ad
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingExtendedAnnotations extends 
DiscoveryServiceBase implements SyncDiscoveryService {
+       Logger logger = ODFTestLogger.get();
+
+       public static class SyncDiscoveryServiceAnnotation extends 
ProfilingAnnotation {
+               private String prop1 = "";
+               private int prop2 = 4237;
+               private MyObject prop3 = new MyObject();
+
+               public String getProp1() {
+                       return prop1;
+               }
+
+               public void setProp1(String prop1) {
+                       this.prop1 = prop1;
+               }
+
+               public int getProp2() {
+                       return prop2;
+               }
+
+               public void setProp2(int prop2) {
+                       this.prop2 = prop2;
+               }
+
+               public MyObject getProp3() {
+                       return prop3;
+               }
+
+               public void setProp3(MyObject prop3) {
+                       this.prop3 = prop3;
+               }
+
+       }
+
+       public static class MyObject {
+               private String anotherProp = "";
+
+               public String getAnotherProp() {
+                       return anotherProp;
+               }
+
+               public void setAnotherProp(String anotherProp) {
+                       this.anotherProp = anotherProp;
+               }
+
+               private MyOtherObject yetAnotherProp = new MyOtherObject();
+
+               public MyOtherObject getYetAnotherProp() {
+                       return yetAnotherProp;
+               }
+
+               public void setYetAnotherProp(MyOtherObject yetAnotherProp) {
+                       this.yetAnotherProp = yetAnotherProp;
+               }
+
+       }
+
+       public static class MyOtherObject {
+               private String myOtherObjectProperty = "";
+
+               public String getMyOtherObjectProperty() {
+                       return myOtherObjectProperty;
+               }
+
+               public void setMyOtherObjectProperty(String 
myOtherObjectProperty) {
+                       this.myOtherObjectProperty = myOtherObjectProperty;
+               }
+
+       }
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               try {
+                       MetaDataObjectReference dataSetRef = 
request.getDataSetContainer().getDataSet().getReference();
+
+                       List<Annotation> annotations = new ArrayList<>();
+                       SyncDiscoveryServiceAnnotation annotation1 = new 
SyncDiscoveryServiceAnnotation();
+                       String annotation1_prop1 = "prop1_1_" + 
dataSetRef.getUrl();
+                       annotation1.setProp1(annotation1_prop1);
+                       annotation1.setProp2(annotation1_prop1.hashCode());
+                       annotation1.setProfiledObject(dataSetRef);
+                       MyObject mo1 = new MyObject();
+                       MyOtherObject moo1 = new MyOtherObject();
+                       moo1.setMyOtherObjectProperty("nestedtwolevels" + 
annotation1_prop1);
+                       mo1.setYetAnotherProp(moo1);
+                       mo1.setAnotherProp("nested" + annotation1_prop1);
+                       annotation1.setProp3(mo1);
+                       annotations.add(annotation1);
+
+                       SyncDiscoveryServiceAnnotation annotation2 = new 
SyncDiscoveryServiceAnnotation();
+                       String annotation2_prop1 = "prop1_2_" + 
dataSetRef.getUrl();
+                       annotation2.setProp1(annotation2_prop1);
+                       annotation2.setProp2(annotation2_prop1.hashCode());
+                       annotation2.setProfiledObject(dataSetRef);
+                       MyObject mo2 = new MyObject();
+                       MyOtherObject moo2 = new MyOtherObject();
+                       moo2.setMyOtherObjectProperty("nestedtwolevels" + 
annotation2_prop1);
+                       mo2.setYetAnotherProp(moo2);
+                       mo2.setAnotherProp("nested" + annotation2_prop1);
+                       annotation2.setProp3(mo2);
+                       annotations.add(annotation2);
+
+                       DiscoveryServiceSyncResponse resp = new 
DiscoveryServiceSyncResponse();
+                       resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                       DiscoveryServiceResult dsResult = new 
DiscoveryServiceResult();
+                       dsResult.setAnnotations(annotations);
+                       resp.setResult(dsResult);
+                       resp.setDetails(this.getClass().getName() + 
".runAnalysis finished OK");
+
+                       logger.info("Returning from discovery service " + 
this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+                       return resp;
+               } catch (Exception exc) {
+                       throw new RuntimeException(exc);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
new file mode 100755
index 0000000..91b544c
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.annotation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+
+public class TestSyncDiscoveryServiceWritingJsonAnnotations extends 
DiscoveryServiceBase implements SyncDiscoveryService {
+       Logger logger = ODFTestLogger.get();
+       private String annotationResult = 
Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"),
 "UTF-8");
+
+       @Override
+       public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest 
request) {
+               try {
+                       MetaDataObjectReference dataSetRef = 
request.getDataSetContainer().getDataSet().getReference();
+
+                       List<Annotation> annotations = new ArrayList<>();
+                       ProfilingAnnotation annotation1 = new 
ProfilingAnnotation();
+                       annotation1.setProfiledObject(dataSetRef);
+                       annotation1.setJsonProperties(annotationResult);
+                       
annotation1.setAnnotationType("JsonAnnotationWriteTest");
+                       annotation1.setJavaClass("JsonAnnotationWriteTest");
+                       annotations.add(annotation1);
+
+                       DiscoveryServiceSyncResponse resp = new 
DiscoveryServiceSyncResponse();
+                       resp.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                       DiscoveryServiceResult dsResult = new 
DiscoveryServiceResult();
+                       dsResult.setAnnotations(annotations);
+                       resp.setResult(dsResult);
+                       resp.setDetails(this.getClass().getName() + 
".runAnalysis finished OK");
+
+                       logger.info("Returning from discovery service " + 
this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp));
+                       return resp;
+               } catch (Exception exc) {
+                       throw new RuntimeException(exc);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
new file mode 100755
index 0000000..b1d2518
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * this test uses a mocked storage therefore no zookeeper is required
+ */
+public class ODFConfigurationTest extends ODFTestcase {
+
+       Logger logger = ODFTestLogger.get();
+
+       @Before
+       public void setupDefaultConfig() throws JsonParseException, 
JsonMappingException, IOException, ValidationException, JSONException {
+               logger.info("reset config to default");
+               InputStream is = 
ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+               ConfigContainer defaultConfig = new 
ObjectMapper().readValue(is, ConfigContainer.class);
+               ConfigManager configManager = new 
ODFInternalFactory().create(ConfigManager.class);
+               configManager.updateConfigContainer(defaultConfig);
+       }
+
+       @Test
+       public void testUserDefinedMerge() throws JsonParseException, 
JsonMappingException, IOException {
+               InputStream is = 
ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+               ConfigContainer defaultConfig;
+               defaultConfig = new ObjectMapper().readValue(is, 
ConfigContainer.class);
+               //set testProps to defaultValues to be overwritten
+               defaultConfig.getOdf().getUserDefined().put("testProp", 
"defaultValue");
+               defaultConfig.getOdf().getUserDefined().put("testProp2", 
"defaultValue");
+               logger.info("Read config: " + defaultConfig);
+
+               //config example with userdefined property testProp to 123
+               String value = "{\r\n\t\"odf\" : {\r\n\t\"userDefined\" : 
{\r\n\t\t\"testProp\" : 123\r\n\t}\r\n}\r\n}\r\n";
+               ConfigContainer props = new ObjectMapper().readValue(value, 
ConfigContainer.class);
+               Utils.mergeODFPOJOs(defaultConfig, props);
+               logger.info("Mergded config: " + defaultConfig);
+
+               Assert.assertEquals(123, 
defaultConfig.getOdf().getUserDefined().get("testProp"));
+               Assert.assertEquals("defaultValue", 
defaultConfig.getOdf().getUserDefined().get("testProp2"));
+       }
+
+       @Test
+       public void testValidation() throws JsonParseException, 
JsonMappingException, IOException {
+               boolean exceptionOccured = false;
+               String value = "{\r\n\t\"odf\" : 
{\r\n\t\t\"discoveryServiceWatcherWaitMs\" : -5\r\n\t}\r\n}\r\n";
+               try {
+                       ConfigContainer props = new 
ObjectMapper().readValue(value, ConfigContainer.class);
+                       props.validate();
+               } catch (ValidationException e) {
+                       exceptionOccured = true;
+               }
+
+               Assert.assertTrue(exceptionOccured);
+       }
+
+       @Test
+       public void testMerge() throws JsonParseException, 
JsonMappingException, IOException {
+               InputStream is = 
ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+               ConfigContainer defaultConfig;
+               defaultConfig = new ObjectMapper().readValue(is, 
ConfigContainer.class);
+               //config example with ODF - queueConsumerWaitMs property value 
777
+               String value = "{\r\n\t\"odf\" : 
{\r\n\t\t\"discoveryServiceWatcherWaitMs\" : 777\r\n\t}\r\n}\r\n";
+               ConfigContainer props = new ObjectMapper().readValue(value, 
ConfigContainer.class);
+               Utils.mergeODFPOJOs(defaultConfig, props);
+
+               // TODOCONFIG, move next line to kafka tests
+               // Assert.assertEquals(777, 
defaultConfig.getOdf().getQueueConsumerWaitMs().intValue());
+       }
+
+       @Test
+       public void testDeepMerge() throws JsonParseException, 
JsonMappingException, IOException {
+               InputStream is = 
ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json");
+               ConfigContainer defaultConfig;
+               defaultConfig = new ObjectMapper().readValue(is, 
ConfigContainer.class);
+               //config example with ODF - kafkaConsumer - offsetsStorage 
property value TEST. All other values for the kafkaConsumer should stay the 
same!
+               String value = "{\r\n\t\"odf\" : 
{\r\n\"messagingConfiguration\": { \"type\": \"" + 
KafkaMessagingConfiguration.class.getName()
+                               + "\", \t\t\"kafkaConsumerConfig\" : { 
\r\n\t\t\t\"offsetsStorage\" : \"TEST\"\r\n\t\t}\r\n\t}\r\n}}\r\n";
+               ConfigContainer props = new ObjectMapper().readValue(value, 
ConfigContainer.class);
+               Utils.mergeODFPOJOs(defaultConfig, props);
+
+               // TODOCONFIG
+               //              Assert.assertEquals("TEST", 
defaultConfig.getOdf().getKafkaConsumerConfig().getOffsetsStorage());
+               //make sure the rest is still default
+               //              Assert.assertEquals(400, 
defaultConfig.getOdf().getKafkaConsumerConfig().getZookeeperSessionTimeoutMs().intValue());
+       }
+
+       @Test
+       public void testGet() {
+               Assert.assertTrue(new 
ODFFactory().create().getSettingsManager().getODFSettings().isReuseRequests());
+       }
+
+       @Test
+       public void testPut() throws InterruptedException, IOException, 
ValidationException, JSONException, ServiceNotFoundException {
+               SettingsManager config = new 
ODFFactory().create().getSettingsManager();
+               String propertyId = "my_dummy_test_property";
+               int testNumber = 123;
+               Map<String, Object> cont = config.getUserDefinedConfig();
+               cont.put(propertyId, testNumber);
+               config.updateUserDefined(cont);
+               Assert.assertEquals(testNumber, 
config.getUserDefinedConfig().get(propertyId));
+
+               String testString = "test";
+               cont.put(propertyId, testString);
+               config.updateUserDefined(cont);
+
+               Assert.assertEquals(testString, 
config.getUserDefinedConfig().get(propertyId));
+
+               JSONObject testJson = new JSONObject();
+               testJson.put("testProp", "test");
+               cont.put(propertyId, testJson);
+               config.updateUserDefined(cont);
+
+               Assert.assertEquals(testJson, 
config.getUserDefinedConfig().get(propertyId));
+
+               ODFSettings settings = config.getODFSettings();
+               logger.info("Last update object: " + 
JSONUtils.toJSON(settings));
+               Assert.assertNotNull(settings);
+               Assert.assertNotNull(settings.getUserDefined());
+               Assert.assertNotNull(settings.getUserDefined().get(propertyId));
+               logger.info("User defined object: " + 
settings.getUserDefined().get(propertyId).getClass());
+               @SuppressWarnings("unchecked")
+               Map<String, Object> notifiedNestedJSON = (Map<String, Object>) 
settings.getUserDefined().get(propertyId);
+               Assert.assertNotNull(notifiedNestedJSON.get("testProp"));
+               Assert.assertTrue(notifiedNestedJSON.get("testProp") instanceof 
String);
+               Assert.assertEquals("test", notifiedNestedJSON.get("testProp"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
new file mode 100755
index 0000000..aea9a30
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.logging.Logger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.api.settings.SparkConfig;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class PasswordEncryptionTest extends TimerTestBase {
+       Logger logger = ODFTestLogger.get();
+       private static final String SPARK_PASSWORD_CONFIG = 
"spark.authenticate.secret";
+
+       @Test
+       public void testGeneralPasswordEncryption() throws Exception {
+               SettingsManager settings = new 
ODFFactory().create().getSettingsManager();
+               ODFSettings settingsWithPlainPasswords = 
settings.getODFSettingsHidePasswords();
+               settingsWithPlainPasswords.setOdfPassword("newOdfPassword");
+               logger.info("Settings with plain password: " + 
JSONUtils.toJSON(settingsWithPlainPasswords));
+               settings.updateODFSettings(settingsWithPlainPasswords);
+
+               ODFSettings settingsWithHiddenPasswords = 
settings.getODFSettingsHidePasswords();
+               String hiddenPasswordIdentifyier = "***hidden***";
+               Assert.assertEquals(hiddenPasswordIdentifyier, 
settingsWithHiddenPasswords.getOdfPassword());
+               logger.info("Settings with hidden password: " + 
JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+               ODFSettings settingsWithEncryptedPassword = 
settings.getODFSettings();
+               Assert.assertEquals("newOdfPassword", 
Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+               logger.info("Settings with encrypted password: " + 
JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+               // When overwriting settings with hidden passwords, encrypted 
passwords must be kept internally
+               settings.updateODFSettings(settingsWithHiddenPasswords);
+               settingsWithEncryptedPassword = settings.getODFSettings();
+               Assert.assertEquals("newOdfPassword", 
Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword()));
+       }
+
+       @Test
+       public void testSparkConfigEncryption() throws Exception {
+               SettingsManager settings = new 
ODFFactory().create().getSettingsManager();
+               SparkConfig plainSparkConfig = new SparkConfig();
+               plainSparkConfig.setConfig(SPARK_PASSWORD_CONFIG, 
"plainConfigValue");
+               ODFSettings settingsWithPlainPasswords = 
settings.getODFSettings();
+               settingsWithPlainPasswords.setSparkConfig(plainSparkConfig);;
+               logger.info("Settings with plain password: " + 
JSONUtils.toJSON(settingsWithPlainPasswords));
+               settings.updateODFSettings(settingsWithPlainPasswords);
+
+               ODFSettings settingsWithHiddenPasswords = 
settings.getODFSettingsHidePasswords();
+               String hiddenPasswordIdentifyier = "***hidden***";
+               String hiddenConfigValue = (String) 
settingsWithHiddenPasswords.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+               Assert.assertEquals(hiddenPasswordIdentifyier, 
hiddenConfigValue);
+               logger.info("Config with hidden password: " + 
JSONUtils.toJSON(settingsWithHiddenPasswords));
+
+               ODFSettings settingsWithEncryptedPassword = 
settings.getODFSettings();
+               String encryptedConfigValue = (String) 
settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+               Assert.assertEquals("plainConfigValue", 
Encryption.decryptText(encryptedConfigValue));
+               logger.info("Config with encrypted password: " + 
JSONUtils.toJSON(settingsWithEncryptedPassword));
+
+               // When overwriting settings with hidden passwords, encrypted 
passwords must be kept internally
+               settings.updateODFSettings(settingsWithHiddenPasswords);
+               encryptedConfigValue = (String) 
settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG);
+               Assert.assertEquals("plainConfigValue", 
Encryption.decryptText(encryptedConfigValue));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
new file mode 100755
index 0000000..3db5778
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.configuration;
+
+import java.util.Collections;
+
+import org.apache.atlas.odf.api.settings.validation.EnumValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.configuration.ConfigContainer;
+import org.apache.atlas.odf.core.configuration.ConfigManager;
+import org.apache.atlas.odf.core.configuration.ServiceValidator;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.settings.validation.ImplementationValidator;
+import org.apache.atlas.odf.api.settings.validation.NumberPositiveValidator;
+import org.apache.atlas.odf.api.settings.validation.PropertyValidator;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+import 
org.apache.atlas.odf.core.test.discoveryservice.TestAsyncDiscoveryServiceWritingAnnotations1;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class ValidationTests extends TimerTestBase {
+
+       @Test
+       public void testEnum() {
+               String[] vals = new String[] { "test", "test2" };
+               String correct = "test";
+               String incorrect = "fail";
+
+               Assert.assertTrue(validateTest(correct, new 
EnumValidator(vals)));
+               Assert.assertFalse(validateTest(incorrect, new 
EnumValidator(vals)));
+       }
+
+       @Test
+       public void testImplementation() {
+               String correct = 
TestAsyncDiscoveryServiceWritingAnnotations1.class.getName();
+               String incorrect = "dummyClass";
+               Assert.assertTrue(validateTest(correct, new 
ImplementationValidator()));
+               Assert.assertFalse(validateTest(incorrect, new 
ImplementationValidator()));
+       }
+
+       @Test
+       public void testService() throws Exception {
+               String s = "{\r\n" + 
+                               "                       \"id\": 
\"asynctestservice\",\r\n" + 
+                               "                       \"name\": \"Async 
test\",\r\n" + 
+                               "                       \"description\": \"The 
async test service\",\r\n" + 
+                               "                       \"endpoint\": {\r\n" + 
+                               "                               
\"runtimeName\": \"Java\",\r\n" + 
+                               "                               \"className\": 
\"TestAsyncDiscoveryService1\"\r\n" +
+                               "                       }\r\n" + 
+                               "               }";
+               
+               DiscoveryServiceProperties newService = JSONUtils.fromJSON(s, 
DiscoveryServiceProperties.class);
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               //ODFConfig odfConfig = new 
ODFFactory().create(ODFConfiguration.class).getODFConfig();
+
+               ConfigContainer new1 = new ConfigContainer();
+               
new1.setRegisteredServices(Collections.singletonList(newService));
+               ConfigManager configManager = new 
ODFInternalFactory().create(ConfigManager.class);
+               configManager.updateConfigContainer(new1);
+               
+               DiscoveryServiceProperties correct = 
discoveryServicesManager.getDiscoveryServicesProperties().get(0);
+               Assert.assertEquals("asynctestservice", correct.getId());
+               correct.setId("newId");
+               DiscoveryServiceProperties incorrect = new 
DiscoveryServiceProperties();
+               Assert.assertTrue(validateTest(correct, new 
ServiceValidator()));
+               Assert.assertFalse(validateTest(incorrect, new 
ServiceValidator()));
+       }
+
+       @Test
+       public void testNumber() {
+               int correct = 5;
+               int incorrect = -5;
+               Assert.assertTrue(validateTest(correct, new 
NumberPositiveValidator()));
+               Assert.assertFalse(validateTest(incorrect, new 
NumberPositiveValidator()));
+       }
+
+       private boolean validateTest(Object value, PropertyValidator validator) 
{
+               try {
+                       validator.validate(null, value);
+                       return true;
+               } catch (ValidationException ex) {
+                       return false;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
new file mode 100755
index 0000000..4fa2eda
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.settings.MessagingConfiguration;
+import org.apache.atlas.odf.api.settings.ODFSettings;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.apache.atlas.odf.json.JSONUtils;
+
+public class AnalysisProcessingTests extends ODFTestcase {
+       Logger logger = ODFTestLogger.get();
+
+       @Test
+       public void testAnalysisProcessingAfterShutdown() throws Exception {
+               final SettingsManager config = new 
ODFFactory().create().getSettingsManager();
+               final ODFSettings odfSettings = config.getODFSettings();
+               final MessagingConfiguration messagingConfiguration = 
odfSettings.getMessagingConfiguration();
+               final Long origRequestRetentionMs = 
messagingConfiguration.getAnalysisRequestRetentionMs();
+               messagingConfiguration.setAnalysisRequestRetentionMs(300000l);
+               config.updateODFSettings(odfSettings);
+
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisRequestTracker tracker = 
JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, 
"org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+               AnalysisRequest req = tracker.getRequest();
+               
req.setDiscoveryServiceSequence(Arrays.asList("asynctestservice"));
+               req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + 
"_dataset");
+               final AnalysisResponse startRequest = cc.startRequest(req);
+               logger.info("Analysis :" + startRequest.getId());
+
+               Assert.assertNull(startRequest.getOriginalRequest());
+               Assert.assertFalse(startRequest.isInvalidRequest());
+               final AnalysisResponse duplicate = cc.startRequest(req);
+               Assert.assertNotNull(duplicate.getOriginalRequest());
+               Assert.assertEquals(startRequest.getId(), duplicate.getId());
+               logger.info("Analysis1 duplciate :" + duplicate.getId());
+
+               final AnalysisCancelResult cancelRequest = 
cc.cancelRequest(startRequest.getId());
+               Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, 
cancelRequest.getState());
+
+               cc.getQueueManager().stop();
+
+               AnalysisResponse response2 = cc.startRequest(req);
+               logger.info("Analysis2:" + response2.getId());
+               AnalysisRequestStatus requestStatus = 
cc.getRequestStatus(response2.getId());
+               int maxWait = 20;
+
+               int currentWait = 0;
+               while (currentWait < maxWait && requestStatus.getState() != 
AnalysisRequestStatus.State.ACTIVE) {
+                       Thread.sleep(100);
+                       currentWait++;
+                       requestStatus = cc.getRequestStatus(response2.getId());
+               }
+               logger.info("THREAD ACTIVE, KILL IT!");
+
+               cc.getQueueManager().start();
+               logger.info("restarted");
+               Assert.assertNull(response2.getOriginalRequest());
+               Assert.assertFalse(response2.isInvalidRequest());
+
+               
messagingConfiguration.setAnalysisRequestRetentionMs(origRequestRetentionMs);
+               config.updateODFSettings(odfSettings);
+
+               currentWait = 0;
+               while (currentWait < maxWait && requestStatus.getState() != 
AnalysisRequestStatus.State.FINISHED) {
+                       Thread.sleep(100);
+                       requestStatus = cc.getRequestStatus(response2.getId());
+               }
+               Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, 
requestStatus.getState());
+       }
+
+       @Test
+       public void testRequestWithAnnotationTypes() throws Exception {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisRequestTracker tracker = 
JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, 
"org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+               AnalysisRequest req = tracker.getRequest();
+               req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + 
"_dataset");
+               List<String> annotationTypes = Arrays.asList(new String[] { 
"AsyncTestDummyAnnotation" });
+               req.setAnnotationTypes(annotationTypes);
+               logger.info(MessageFormat.format("Running discovery request for 
annotation type {0}.", annotationTypes));
+               AnalysisResponse resp = cc.startRequest(req);
+               logger.info(MessageFormat.format("Started request id {0}.", 
resp.getId()));
+               Assert.assertNotNull(resp.getId());
+               Assert.assertFalse(resp.isInvalidRequest());
+
+               int currentWait = 0;
+               int maxWait = 20;
+               AnalysisRequestStatus requestStatus = 
cc.getRequestStatus(resp.getId());
+               while (currentWait < maxWait && requestStatus.getState() != 
AnalysisRequestStatus.State.FINISHED) {
+                       Thread.sleep(100);
+                       requestStatus = cc.getRequestStatus(resp.getId());
+               }
+               Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, 
requestStatus.getState());
+               Assert.assertEquals("Generated service has incorrect number of 
elements.", 1, requestStatus.getRequest().getDiscoveryServiceSequence().size());
+               Assert.assertEquals("Generated service sequence differs from 
expected value.", "asynctestservice", 
requestStatus.getRequest().getDiscoveryServiceSequence().get(0));
+       }
+
+       @Test
+       public void testRequestWithMissingAnnotationTypes() throws Exception {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisRequestTracker tracker = 
JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, 
"org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null);
+               AnalysisRequest req = tracker.getRequest();
+               req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + 
"_dataset");
+               List<String> annotationTypes = Arrays.asList(new String[] { 
"noServiceExistsForThisAnnotationType" });
+               req.setAnnotationTypes(annotationTypes);
+               logger.info(MessageFormat.format("Running discovery request for 
non-existing annotation type {0}.", annotationTypes));
+               AnalysisResponse resp = cc.startRequest(req);
+               Assert.assertTrue(resp.isInvalidRequest());
+               Assert.assertEquals("Unexpected error message.", "No suitable 
discovery services found to create the requested annotation types.", 
resp.getDetails());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
new file mode 100755
index 0000000..fd39e15
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import java.util.Collections;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+
+public class AnalysisRequestCancellationTest extends ODFTestcase {
+
+       Logger logger = ODFTestLogger.get();
+
+       AnalysisRequestTracker generateTracker(String id, STATUS status) {
+               AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+               Utils.setCurrentTimeAsLastModified(tracker);
+               tracker.setNextDiscoveryServiceRequest(0);
+               AnalysisRequest req = new AnalysisRequest();
+               req.setId(id);
+               MetaDataObjectReference ref = new MetaDataObjectReference();
+               ref.setId("DataSet" + id);
+               req.setDataSets(Collections.singletonList(ref));
+               tracker.setRequest(req);
+               tracker.setStatus(status);
+               return tracker;
+       }
+
+       @Test
+       public void testRequestCancellationNotFoundFailure() {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisCancelResult cancelRequest = 
cc.cancelRequest("dummy_id");
+               Assert.assertEquals(cancelRequest.getState(), 
AnalysisCancelResult.State.NOT_FOUND);
+       }
+
+       @Test
+       public void testRequestCancellationWrongStateFailure() {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisRequestTrackerStore store = (new 
ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+               String testId = "test_id1";
+               AnalysisRequestTracker tracker = null;
+               AnalysisCancelResult cancelRequest = null;
+               
+               tracker = generateTracker(testId, STATUS.FINISHED);
+               store.store(tracker);
+               cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(cancelRequest.getState(), 
AnalysisCancelResult.State.INVALID_STATE);
+
+               tracker = generateTracker(testId, STATUS.ERROR);
+               store.store(tracker);
+               cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(cancelRequest.getState(), 
AnalysisCancelResult.State.INVALID_STATE);
+
+               tracker = generateTracker(testId, STATUS.CANCELLED);
+               store.store(tracker);
+               cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(cancelRequest.getState(), 
AnalysisCancelResult.State.INVALID_STATE);
+       }
+
+       @Test
+       public void testRequestCancellationSuccess() {
+               ControlCenter cc = new 
ODFInternalFactory().create(ControlCenter.class);
+               AnalysisRequestTrackerStore store = (new 
ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+               String testId = "test_id2";
+
+               AnalysisRequestTracker tracker = generateTracker(testId, 
STATUS.INITIALIZED);
+               store.store(tracker);
+               AnalysisCancelResult cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, 
cancelRequest.getState());
+
+               tracker = generateTracker(testId, 
STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+               store.store(tracker);
+               cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, 
cancelRequest.getState());
+
+               tracker = generateTracker(testId, 
STATUS.DISCOVERY_SERVICE_RUNNING);
+               store.store(tracker);
+               cancelRequest = cc.cancelRequest(testId);
+               Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, 
cancelRequest.getState());
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
new file mode 100755
index 0000000..7eb46d8
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.junit.Test;
+
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.ODFTestcase;
+
+public class AnalysisRequestTrackerStoreTest extends ODFTestcase {
+
+       Logger logger = ODFTestLogger.get();
+
+       AnalysisRequestTracker generateTracker(String id, STATUS status) {
+               AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+               Utils.setCurrentTimeAsLastModified(tracker);
+               tracker.setNextDiscoveryServiceRequest(0);
+               AnalysisRequest req = new AnalysisRequest();
+               req.setId(id);
+               MetaDataObjectReference ref = new MetaDataObjectReference();
+               ref.setId("DataSet" + id);
+               req.setDataSets(Collections.singletonList(ref));
+               tracker.setRequest(req);
+               tracker.setStatus(status);
+               return tracker;
+       }
+
+       @Test
+       public void testStore() throws Exception {
+               AnalysisRequestTrackerStore store = (new 
ODFInternalFactory()).create(AnalysisRequestTrackerStore.class);
+               assertNotNull(store);
+               int MAX_TRACKERS = 50;
+               List<AnalysisRequestTracker> trackers1 = new 
ArrayList<AnalysisRequestTracker>();
+               STATUS lastStatus = STATUS.IN_DISCOVERY_SERVICE_QUEUE;
+               for (int i = 0; i < MAX_TRACKERS; i++) {
+                       trackers1.add(generateTracker("STORETEST_ID" + i, 
lastStatus));
+               }
+
+               logger.info("Storing " + MAX_TRACKERS + " Trackers");
+               long pass1Start = System.currentTimeMillis();
+               for (AnalysisRequestTracker tracker : trackers1) {
+                       store.store(tracker);
+               }
+               long pass1End = System.currentTimeMillis();
+
+               logger.info("Storing " + MAX_TRACKERS + " Trackers again with 
new status");
+
+               lastStatus = STATUS.FINISHED;
+               List<AnalysisRequestTracker> trackers2 = new 
ArrayList<AnalysisRequestTracker>();
+               for (int i = 0; i < MAX_TRACKERS; i++) {
+                       trackers2.add(generateTracker("STORETEST_ID" + i, 
lastStatus));
+               }
+               long pass2Start = System.currentTimeMillis();
+               for (AnalysisRequestTracker tracker : trackers2) {
+                       store.store(tracker);
+               }
+               long pass2End = System.currentTimeMillis();
+
+               Thread.sleep(2000);
+               logger.info("Querying and checking " + MAX_TRACKERS + " 
Trackers");
+
+               long queryStart = System.currentTimeMillis();
+
+               for (int i = 0; i < MAX_TRACKERS; i++) {
+                       final String analysisRequestId = "STORETEST_ID" + i;
+                       AnalysisRequestTracker tracker = 
store.query(analysisRequestId);
+                       assertNotNull(tracker);
+                       assertEquals(1, 
tracker.getRequest().getDataSets().size());
+                       MetaDataObjectReference ref = new 
MetaDataObjectReference();
+                       ref.setId("DataSet" + analysisRequestId);
+                       assertEquals(tracker.getRequest().getDataSets().get(0), 
ref);
+                       assertEquals(lastStatus, tracker.getStatus());
+               }
+               long queryEnd = System.currentTimeMillis();
+
+               System.out.println("First pass: " + (pass1End - pass1Start) + 
"ms, second pass: " + (pass2End - pass2Start) + "ms, query: " + (queryEnd - 
queryStart) + "ms");
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
new file mode 100755
index 0000000..347fb84
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.core.controlcenter.DeclarativeRequestMapper;
+import org.apache.atlas.odf.core.test.ODFTestBase;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.controlcenter.ControlCenter;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+
+public class DeclarativeRequestMapperTest extends ODFTestBase {
+       final private static String SERVICE_CLASSNAME = 
"TestAsyncDiscoveryService1";
+       final private static String[] EXPECTED_SERVICE_SEQUENCES = new String[] 
{ "pre3,ser1", "alt1,ser1", "pre4,pre1,ser1", 
+                       "pre3,ser1,ser3", "pre3,ser1,ser5", "alt1,ser1,ser3", 
"alt1,ser1,ser5", "pre3,pre2,ser4", "alt1,pre2,ser4", 
+                       "pre4,pre1,ser1,ser3", "pre4,pre1,ser1,ser5", 
"pre3,ser1,alt1,ser3", "pre3,ser1,pre2,ser4", "pre3,ser1,alt1,ser5" };
+       private Logger logger = Logger.getLogger(ControlCenter.class.getName());
+
+       private static void createDiscoveryService(String serviceId, String[] 
resultingAnnotationTypes, String[] prerequisiteAnnotationTypes, String[] 
supportedObjectTypes) throws ValidationException, JSONException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               DiscoveryServiceProperties dsProperties = new 
DiscoveryServiceProperties();
+               DiscoveryServiceJavaEndpoint dse = new 
DiscoveryServiceJavaEndpoint();
+               dse.setClassName(SERVICE_CLASSNAME);
+               dsProperties.setEndpoint(JSONUtils.convert(dse, 
DiscoveryServiceEndpoint.class));
+               dsProperties.setId(serviceId);
+               dsProperties.setName(serviceId + " Discovery Service");
+               
dsProperties.setPrerequisiteAnnotationTypes(Arrays.asList(prerequisiteAnnotationTypes));
+               
dsProperties.setResultingAnnotationTypes(Arrays.asList(resultingAnnotationTypes));
+               
dsProperties.setSupportedObjectTypes(Arrays.asList(supportedObjectTypes));
+               discoveryServicesManager.createDiscoveryService(dsProperties);
+       }
+
+       private void deleteDiscoveryService(String serviceId, boolean 
failOnError) throws ValidationException {
+               DiscoveryServiceManager discoveryServicesManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+               try {
+                       
discoveryServicesManager.deleteDiscoveryService(serviceId);
+               }
+               catch (ServiceNotFoundException e) {
+                       if (failOnError) {
+                               Assert.fail("Error deleting discovery 
services.");
+                       }
+               }               
+       }
+
+       private void deleteDiscoveryServices(boolean failOnError) throws 
ValidationException {
+               List<String> serviceIds = Arrays.asList(new String[] { "ser1", 
"ser2", "ser3", "ser4", "ser5", "pre1", "pre2", "pre3", "pre4", "alt1" });
+               for (String serviceId : serviceIds) {
+                       deleteDiscoveryService(serviceId, failOnError);
+               }
+       }
+
+       private void createDiscoveryServices() throws ValidationException, 
JSONException {
+               createDiscoveryService("ser1", new String[] { "an1", "com1", 
"com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" 
});
+               createDiscoveryService("ser2", new String[] { "an2", "com1"     
    }, new String[] { "pre2"         }, new String[] { "Table", "DataFile" });
+               createDiscoveryService("ser3", new String[] {                
"com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" 
});
+               createDiscoveryService("ser4", new String[] { "an1", "com1", 
"com2" }, new String[] { "pre1", "pre2" }, new String[] { "Table", "DataFile" 
});
+               createDiscoveryService("ser5", new String[] {        "com1", 
"com2" }, new String[] { "pre1"         }, new String[] { "Table", "DataFile" 
});
+
+               createDiscoveryService("pre1", new String[] { "pre1"            
    }, new String[] { "pre4"         }, new String[] { "Table", "DataFile" });
+               createDiscoveryService("pre2", new String[] { "pre2"            
    }, new String[] {                }, new String[] { "Table", "DataFile" });
+               createDiscoveryService("pre3", new String[] { "pre1"            
    }, new String[] {                }, new String[] { "Table", "DataFile" });
+               createDiscoveryService("pre4", new String[] { "pre4"            
    }, new String[] {                }, new String[] { "Table", "DataFile" });
+
+               createDiscoveryService("alt1", new String[] { "pre1"            
    }, new String[] {                }, new String[] { "Table", "DataFile" });
+       }
+
+       @Test
+       public void testDiscoveryServiceSequences() throws Exception {
+               deleteDiscoveryServices(false);
+               createDiscoveryServices();
+
+               AnalysisRequest request = new AnalysisRequest();
+               request.setAnnotationTypes(Arrays.asList( new String[] { "an1", 
"com2" }));
+               DeclarativeRequestMapper mapper = new 
DeclarativeRequestMapper(request);
+               logger.log(Level.INFO, "Printing list of mapper result to 
stdout.");
+               int i = 0;
+               for (DeclarativeRequestMapper.DiscoveryServiceSequence 
discoveryApproach : mapper.getDiscoveryServiceSequences()) {
+                       String sequence = Utils.joinStrings(new 
ArrayList<String>(discoveryApproach.getServiceSequence()), ',');
+                       System.out.println(sequence);
+                       if (i < EXPECTED_SERVICE_SEQUENCES.length) {
+                               
Assert.assertTrue(sequence.equals(EXPECTED_SERVICE_SEQUENCES[i++]));
+                       }
+               }
+               Assert.assertEquals("Number of calculated discovery service 
sequences does not match expected value.", 36, 
mapper.getDiscoveryServiceSequences().size());
+
+               deleteDiscoveryServices(true);
+       }
+
+       @Test
+       public void testRecommendedDiscoveryServiceSequence() throws Exception {
+               deleteDiscoveryServices(false);
+               createDiscoveryServices();
+
+               AnalysisRequest request = new AnalysisRequest();
+               request.setAnnotationTypes(Arrays.asList( new String[] { 
"com2", "pre4" }));
+               DeclarativeRequestMapper mapper = new 
DeclarativeRequestMapper(request);
+               Assert.assertEquals("Recommended sequence does not match 
expected string.", "pre4,pre1,ser1", 
Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+               deleteDiscoveryServices(true);
+       }
+
+       @Test
+       public void testRemoveFailingService() throws Exception {
+               deleteDiscoveryServices(false);
+               createDiscoveryServices();
+
+               AnalysisRequest request = new AnalysisRequest();
+               request.setAnnotationTypes(Arrays.asList(new String[] { "an1", 
"com2" }));
+               DeclarativeRequestMapper mapper = new 
DeclarativeRequestMapper(request);
+               Assert.assertEquals("Original sequence does not match expected 
string.", EXPECTED_SERVICE_SEQUENCES[0], 
Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+               mapper.removeDiscoveryServiceSequences("ser1");
+               Assert.assertEquals("Updated sequence does not match expected 
string.", "pre3,pre2,ser4", 
Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ','));
+
+               deleteDiscoveryServices(true);
+       }
+
+       @Test
+       public void testRequestWithManyAnnotationTypes() throws Exception {
+               deleteDiscoveryServices(false);
+               createDiscoveryServices();
+
+               AnalysisRequest request = new AnalysisRequest();
+               request.setAnnotationTypes(Arrays.asList(new String[] {  "an1", 
"an2", "com1", "com2", "pre1", "pre2", "pre4" }));
+               DeclarativeRequestMapper mapper = new 
DeclarativeRequestMapper(request);
+               Assert.assertEquals("Number of calculated discovery service 
sequences does not match expected value.", 75, 
mapper.getDiscoveryServiceSequences().size());
+
+               deleteDiscoveryServices(true);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
new file mode 100755
index 0000000..96a4fee
--- /dev/null
+++ 
b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.test.controlcenter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.atlas.odf.core.controlcenter.ThreadManager;
+import org.apache.atlas.odf.core.test.ODFTestLogger;
+import org.apache.atlas.odf.core.test.TimerTestBase;
+
+public class DefaultThreadManagerTest extends TimerTestBase {
+
+       int threadMS = 100;
+       int waitMS = 5000;
+       
+       Logger logger = ODFTestLogger.get();
+
+       class TestRunnable implements ODFRunnable {
+
+               String id;
+               boolean cancelled = false;
+               long msToWaitBeforeFinish;
+               
+               public TestRunnable(String id, long msToWaitBeforeFinish) {
+                       this.id = id;
+                       this.msToWaitBeforeFinish = msToWaitBeforeFinish;
+               }
+               
+               public TestRunnable(String id) {
+                       this(id, threadMS);
+               }
+
+               @Override
+               public void run() {
+                       logger.info("Starting thread with ID: " + id);
+                       try {
+                               Thread.sleep(msToWaitBeforeFinish);
+                       } catch (InterruptedException e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       }
+                       logger.info("Thread finished with ID: " + id);
+
+               }
+
+               @Override
+               public void setExecutorService(ExecutorService service) {
+                       // TODO Auto-generated method stub
+
+               }
+
+               @Override
+               public void cancel() {
+                       cancelled = true;
+               }
+
+               @Override
+               public boolean isReady() {
+                       return true;
+               }
+
+       }
+
+       @Test
+       public void testSimple() throws Exception {
+               ODFInternalFactory f = new ODFInternalFactory();
+               ThreadManager tm = f.create(ThreadManager.class);
+               
tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+               assertNotNull(tm);
+
+               String id1 = "id1";
+               String id2 = "id2";
+
+               // start id1
+               ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id1);
+               Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+               boolean b = tm.startUnmanagedThread(id1, new 
TestRunnable(id1)).isNewThreadCreated();
+               assertTrue(b);
+               b = tm.startUnmanagedThread(id1, new 
TestRunnable(id1)).isNewThreadCreated();
+               assertFalse(b);
+
+               st = tm.getStateOfUnmanagedThread(id1);
+               Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st);
+
+               // start id2
+               st = tm.getStateOfUnmanagedThread(id2);
+               Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+               b = tm.startUnmanagedThread(id2, new 
TestRunnable(id2)).isNewThreadCreated();
+               assertTrue(b);
+               b = tm.startUnmanagedThread(id2, new 
TestRunnable(id2)).isNewThreadCreated();
+               assertFalse(b);
+
+               Thread.sleep(waitMS);
+               st = tm.getStateOfUnmanagedThread(id1);
+               Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st);
+               b = tm.startUnmanagedThread(id1, new 
TestRunnable(id1)).isNewThreadCreated();
+               assertTrue(b);
+
+               st = tm.getStateOfUnmanagedThread(id2);
+               // id2 should be removed from thread list
+               Assert.assertTrue(ThreadStatus.ThreadState.FINISHED.equals(st) 
|| ThreadStatus.ThreadState.NON_EXISTENT.equals(st));
+
+               tm.shutdownThreads(Arrays.asList("id1", "id2"));
+       }
+
+       @Test
+       public void testManyThreads() throws Exception {
+               ODFInternalFactory f = new ODFInternalFactory();
+               ThreadManager tm = f.create(ThreadManager.class);
+               
tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService());
+
+               assertNotNull(tm);
+
+               List<String> threadIds = new ArrayList<>();
+               int THREAD_NUM = 20;
+               for (int i = 0; i < THREAD_NUM; i++) {
+                       String id = "ThreadID" + i;
+                       threadIds.add(id);
+                       ThreadStatus.ThreadState st = 
tm.getStateOfUnmanagedThread(id);
+                       
Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st);
+
+                       boolean b = tm.startUnmanagedThread(id, new 
TestRunnable(id)).isNewThreadCreated();
+                       assertTrue(b);
+                       b = tm.startUnmanagedThread(id, new 
TestRunnable(id)).isNewThreadCreated();
+                       assertFalse(b);
+
+                       st = tm.getStateOfUnmanagedThread(id);
+                       Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, 
st);
+
+               }
+               logger.info("All threads scheduled");
+
+               Thread.sleep(waitMS);
+
+               for (int i = 0; i < THREAD_NUM; i++) {
+                       String id = "ThreadID" + i;
+                       ThreadStatus.ThreadState st = 
tm.getStateOfUnmanagedThread(id);
+                       Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, 
st);
+               }
+               tm.shutdownThreads(threadIds);
+
+       }
+
+}

Reply via email to