http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java new file mode 100755 index 0000000..68740e4 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/TimerTestBase.java @@ -0,0 +1,87 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Logger; + +import org.apache.wink.json4j.JSONException; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.Stopwatch; +import org.junit.runner.Description; + +import com.google.common.io.Files; + +public class TimerTestBase { + static final String logFilePath = "/tmp/odf-test-execution-log.csv"; + static Map<String, HashMap<String, Long>> testTimeMap = new HashMap<String, HashMap<String, Long>>(); + final static Logger logger = ODFTestLogger.get(); + + @Rule + public Stopwatch timeWatcher = new Stopwatch() { + @Override + protected void finished(long nanos, Description description) { + HashMap<String, Long> testMap = testTimeMap.get(description.getClassName()); + if (testMap == null) { + testMap = new HashMap<String, Long>(); + testTimeMap.put(description.getClassName(), testMap); + } + testMap.put(description.getMethodName(), (nanos / 1000 / 1000)); + } + }; + + @AfterClass + public static void tearDownAndLogTimes() throws JSONException { + try { + File logFile = new File(logFilePath); + Set<String> uniqueRows = new HashSet<String>(); + if (logFile.exists()) { + uniqueRows = new HashSet<String>(Files.readLines(logFile, StandardCharsets.UTF_8)); + } + + for (Entry<String, HashMap<String, Long>> entry : testTimeMap.entrySet()) { + for (Entry<String, Long> testEntry : entry.getValue().entrySet()) { + String logRow = new StringBuilder().append(testEntry.getKey()).append(",").append(testEntry.getValue()).append(",").append(entry.getKey()).append(",") + .append(System.getProperty("odf.build.project.name", "ProjectNameNotDefined")).toString(); + uniqueRows.add(logRow); + } + } + + StringBuilder logContent = new StringBuilder(); + Iterator<String> rowIterator = uniqueRows.iterator(); + while (rowIterator.hasNext()) { + logContent.append(rowIterator.next()); + if (rowIterator.hasNext()) { + logContent.append("\n"); + } + } + + logger.info("Total time consumed by succeeded tests:\n" + logContent.toString()); + logFile.createNewFile(); + Files.write(logContent.toString().getBytes("UTF-8"), logFile); + } catch (IOException e) { + logger.warning("Error writing test execution log"); + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java new file mode 100755 index 0000000..7a1f0ed --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationExtensionTest.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.annotation; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONObject; +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.TimerTestBase; +import org.apache.atlas.odf.json.AnnotationDeserializer; +import org.apache.atlas.odf.json.AnnotationSerializer; + +public class AnnotationExtensionTest extends TimerTestBase { + + static Logger logger = ODFTestLogger.get(); + + public static <T> T readJSONObjectFromFileInClasspath(ObjectMapper om, Class<T> cl, String pathToFile, ClassLoader classLoader) { + if (classLoader == null) { + // use current classloader if not provided + classLoader = AnnotationExtensionTest.class.getClassLoader(); + } + InputStream is = classLoader.getResourceAsStream(pathToFile); + T result = null; + try { + result = om.readValue(is, cl); + } catch (IOException e) { + // assume that this is a severe error since the provided JSONs should be correct + throw new RuntimeException(e); + } + + return result; + } + + @Test + public void testWithUtils() throws Exception { + testSimple(JSONUtils.getGlobalObjectMapper()); + } + + @Test + public void testWithSeparateObjectMapper() throws Exception { + ObjectMapper om = new ObjectMapper(); + SimpleModule mod = new SimpleModule("annotation module", Version.unknownVersion()); + mod.addDeserializer(Annotation.class, new AnnotationDeserializer()); + mod.addSerializer(Annotation.class, new AnnotationSerializer()); + om.registerModule(mod); + testSimple(om); + } + + private void testSimple(ObjectMapper om) throws Exception { + ExtensionTestAnnotation newTestAnnot = new ExtensionTestAnnotation(); + String strValue = "newstring1"; + int intValue = 4237; + newTestAnnot.setNewStringProp1(strValue); + newTestAnnot.setNewIntProp2(intValue); +// String newTestAnnotJSON = om.writeValueAsString(newTestAnnot); + String newTestAnnotJSON = JSONUtils.toJSON(newTestAnnot).toString(); + logger.info("New test annot JSON: " + newTestAnnotJSON); + + logger.info("Deserializing with " + Annotation.class.getSimpleName() + "class as target class"); + Annotation annot1 = om.readValue(newTestAnnotJSON, Annotation.class); + Assert.assertNotNull(annot1); + logger.info("Deserialized annotation JSON (target: " + Annotation.class.getSimpleName() + "): " + om.writeValueAsString(annot1)); + logger.info("Deserialized annotation class (target: " + Annotation.class.getSimpleName() + "): " + annot1.getClass().getName()); + Assert.assertEquals(ExtensionTestAnnotation.class, annot1.getClass()); + ExtensionTestAnnotation extAnnot1 = (ExtensionTestAnnotation) annot1; + Assert.assertEquals(strValue, extAnnot1.getNewStringProp1()); + Assert.assertEquals(intValue, extAnnot1.getNewIntProp2()); + + /* This does not make sense as you would never enter ExtensionTestAnnotation.class as deserialization target + * which would enforce usage of the standard Bean serializer (since no serializer is registered for this specific class -> jsonProperties can not be mapped + logger.info("Calling deserialization with " + ExtensionTestAnnotation.class.getSimpleName() + " as target"); + ExtensionTestAnnotation annot2 = om.readValue(newTestAnnotJSON, ExtensionTestAnnotation.class); + Assert.assertNotNull(annot2); + logger.info("Deserialized annotation JSON (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + om.writeValueAsString(annot2)); + logger.info("Deserialized annotation class (target: " + ExtensionTestAnnotation.class.getSimpleName() + "): " + annot2.getClass().getName()); + Assert.assertEquals(ExtensionTestAnnotation.class, annot2.getClass()); + String s = annot2.getNewStringProp1(); + Assert.assertEquals(strValue, annot2.getNewStringProp1()); + Assert.assertEquals(intValue, annot2.getNewIntProp2()); */ + + logger.info("Processing profiling annotation..."); + Annotation unknownAnnot = readJSONObjectFromFileInClasspath(om, Annotation.class, "org/apache/atlas/odf/core/test/annotation/annotexttest1.json", null); + Assert.assertNotNull(unknownAnnot); + logger.info("Read Unknown annotation: " + unknownAnnot.getClass().getName()); + Assert.assertEquals(ProfilingAnnotation.class, unknownAnnot.getClass()); + + logger.info("Read profiling annotation: " + om.writeValueAsString(unknownAnnot)); + JSONObject jsonPropertiesObj = new JSONObject(unknownAnnot.getJsonProperties()); + Assert.assertEquals("newProp1Value", jsonPropertiesObj.get("newProp1")); + Assert.assertEquals((Integer) 4237, jsonPropertiesObj.get("newProp2")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java new file mode 100755 index 0000000..b65ce17 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/AnnotationStoreTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.annotation; + +import java.util.List; +import java.util.UUID; + +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.annotation.AnnotationStore; +import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore; +import org.apache.atlas.odf.core.test.ODFTestcase; + +public class AnnotationStoreTest extends ODFTestcase { + + private AnnotationStore createAnnotationStore() { + return new DefaultStatusQueueStore(); + } + + @Test + public void testStoreProfilingAnnotation() throws Exception { + AnnotationStore as = createAnnotationStore(); + + String modRef1Id = UUID.randomUUID().toString(); + MetaDataObjectReference mdoref1 = new MetaDataObjectReference(); + mdoref1.setId(modRef1Id); + + ProfilingAnnotation annot1 = new ProfilingAnnotation(); + annot1.setJsonProperties("{\"a\": \"b\"}"); + annot1.setAnnotationType("AnnotType1"); + annot1.setProfiledObject(mdoref1); + + MetaDataObjectReference annot1Ref = as.store(annot1); + Assert.assertNotNull(annot1Ref.getId()); + List<Annotation> retrievedAnnots = as.getAnnotations(mdoref1, null); + Assert.assertEquals(1, retrievedAnnots.size()); + + Annotation retrievedAnnot = retrievedAnnots.get(0); + Assert.assertTrue(annot1 != retrievedAnnot); + Assert.assertTrue(retrievedAnnot instanceof ProfilingAnnotation); + ProfilingAnnotation retrievedProfilingAnnotation = (ProfilingAnnotation) retrievedAnnot; + Assert.assertEquals(modRef1Id, retrievedProfilingAnnotation.getProfiledObject().getId()); + Assert.assertEquals(annot1Ref, retrievedAnnot.getReference()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java new file mode 100755 index 0000000..cd8f695 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/ExtensionTestAnnotation.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.annotation; + +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; + +class ExtensionTestAnnotation extends ProfilingAnnotation { + + private String newStringProp1; + private int newIntProp2; + + public String getNewStringProp1() { + return newStringProp1; + } + + public void setNewStringProp1(String newStringProp1) { + this.newStringProp1 = newStringProp1; + } + + public int getNewIntProp2() { + return newIntProp2; + } + + public void setNewIntProp2(int newIntProp2) { + this.newIntProp2 = newIntProp2; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java new file mode 100755 index 0000000..f65e3ad --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingExtendedAnnotations.java @@ -0,0 +1,147 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.annotation; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; + +public class TestSyncDiscoveryServiceWritingExtendedAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService { + Logger logger = ODFTestLogger.get(); + + public static class SyncDiscoveryServiceAnnotation extends ProfilingAnnotation { + private String prop1 = ""; + private int prop2 = 4237; + private MyObject prop3 = new MyObject(); + + public String getProp1() { + return prop1; + } + + public void setProp1(String prop1) { + this.prop1 = prop1; + } + + public int getProp2() { + return prop2; + } + + public void setProp2(int prop2) { + this.prop2 = prop2; + } + + public MyObject getProp3() { + return prop3; + } + + public void setProp3(MyObject prop3) { + this.prop3 = prop3; + } + + } + + public static class MyObject { + private String anotherProp = ""; + + public String getAnotherProp() { + return anotherProp; + } + + public void setAnotherProp(String anotherProp) { + this.anotherProp = anotherProp; + } + + private MyOtherObject yetAnotherProp = new MyOtherObject(); + + public MyOtherObject getYetAnotherProp() { + return yetAnotherProp; + } + + public void setYetAnotherProp(MyOtherObject yetAnotherProp) { + this.yetAnotherProp = yetAnotherProp; + } + + } + + public static class MyOtherObject { + private String myOtherObjectProperty = ""; + + public String getMyOtherObjectProperty() { + return myOtherObjectProperty; + } + + public void setMyOtherObjectProperty(String myOtherObjectProperty) { + this.myOtherObjectProperty = myOtherObjectProperty; + } + + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + try { + MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference(); + + List<Annotation> annotations = new ArrayList<>(); + SyncDiscoveryServiceAnnotation annotation1 = new SyncDiscoveryServiceAnnotation(); + String annotation1_prop1 = "prop1_1_" + dataSetRef.getUrl(); + annotation1.setProp1(annotation1_prop1); + annotation1.setProp2(annotation1_prop1.hashCode()); + annotation1.setProfiledObject(dataSetRef); + MyObject mo1 = new MyObject(); + MyOtherObject moo1 = new MyOtherObject(); + moo1.setMyOtherObjectProperty("nestedtwolevels" + annotation1_prop1); + mo1.setYetAnotherProp(moo1); + mo1.setAnotherProp("nested" + annotation1_prop1); + annotation1.setProp3(mo1); + annotations.add(annotation1); + + SyncDiscoveryServiceAnnotation annotation2 = new SyncDiscoveryServiceAnnotation(); + String annotation2_prop1 = "prop1_2_" + dataSetRef.getUrl(); + annotation2.setProp1(annotation2_prop1); + annotation2.setProp2(annotation2_prop1.hashCode()); + annotation2.setProfiledObject(dataSetRef); + MyObject mo2 = new MyObject(); + MyOtherObject moo2 = new MyOtherObject(); + moo2.setMyOtherObjectProperty("nestedtwolevels" + annotation2_prop1); + mo2.setYetAnotherProp(moo2); + mo2.setAnotherProp("nested" + annotation2_prop1); + annotation2.setProp3(mo2); + annotations.add(annotation2); + + DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse(); + resp.setCode(DiscoveryServiceResponse.ResponseCode.OK); + DiscoveryServiceResult dsResult = new DiscoveryServiceResult(); + dsResult.setAnnotations(annotations); + resp.setResult(dsResult); + resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK"); + + logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp)); + return resp; + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java new file mode 100755 index 0000000..91b544c --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/annotation/TestSyncDiscoveryServiceWritingJsonAnnotations.java @@ -0,0 +1,63 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.annotation; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.models.Annotation; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResult; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; + +public class TestSyncDiscoveryServiceWritingJsonAnnotations extends DiscoveryServiceBase implements SyncDiscoveryService { + Logger logger = ODFTestLogger.get(); + private String annotationResult = Utils.getInputStreamAsString(this.getClass().getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/integrationtest/metadata/internal/atlas/nested_annotation_example.json"), "UTF-8"); + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + try { + MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference(); + + List<Annotation> annotations = new ArrayList<>(); + ProfilingAnnotation annotation1 = new ProfilingAnnotation(); + annotation1.setProfiledObject(dataSetRef); + annotation1.setJsonProperties(annotationResult); + annotation1.setAnnotationType("JsonAnnotationWriteTest"); + annotation1.setJavaClass("JsonAnnotationWriteTest"); + annotations.add(annotation1); + + DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse(); + resp.setCode(DiscoveryServiceResponse.ResponseCode.OK); + DiscoveryServiceResult dsResult = new DiscoveryServiceResult(); + dsResult.setAnnotations(annotations); + resp.setResult(dsResult); + resp.setDetails(this.getClass().getName() + ".runAnalysis finished OK"); + + logger.info("Returning from discovery service " + this.getClass().getSimpleName() + " with result: " + JSONUtils.toJSON(resp)); + return resp; + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java new file mode 100755 index 0000000..b1d2518 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ODFConfigurationTest.java @@ -0,0 +1,165 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.configuration; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; +import org.apache.atlas.odf.api.settings.KafkaMessagingConfiguration; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.configuration.ConfigContainer; +import org.apache.atlas.odf.core.configuration.ConfigManager; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * this test uses a mocked storage therefore no zookeeper is required + */ +public class ODFConfigurationTest extends ODFTestcase { + + Logger logger = ODFTestLogger.get(); + + @Before + public void setupDefaultConfig() throws JsonParseException, JsonMappingException, IOException, ValidationException, JSONException { + logger.info("reset config to default"); + InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json"); + ConfigContainer defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class); + ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class); + configManager.updateConfigContainer(defaultConfig); + } + + @Test + public void testUserDefinedMerge() throws JsonParseException, JsonMappingException, IOException { + InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json"); + ConfigContainer defaultConfig; + defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class); + //set testProps to defaultValues to be overwritten + defaultConfig.getOdf().getUserDefined().put("testProp", "defaultValue"); + defaultConfig.getOdf().getUserDefined().put("testProp2", "defaultValue"); + logger.info("Read config: " + defaultConfig); + + //config example with userdefined property testProp to 123 + String value = "{\r\n\t\"odf\" : {\r\n\t\"userDefined\" : {\r\n\t\t\"testProp\" : 123\r\n\t}\r\n}\r\n}\r\n"; + ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class); + Utils.mergeODFPOJOs(defaultConfig, props); + logger.info("Mergded config: " + defaultConfig); + + Assert.assertEquals(123, defaultConfig.getOdf().getUserDefined().get("testProp")); + Assert.assertEquals("defaultValue", defaultConfig.getOdf().getUserDefined().get("testProp2")); + } + + @Test + public void testValidation() throws JsonParseException, JsonMappingException, IOException { + boolean exceptionOccured = false; + String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : -5\r\n\t}\r\n}\r\n"; + try { + ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class); + props.validate(); + } catch (ValidationException e) { + exceptionOccured = true; + } + + Assert.assertTrue(exceptionOccured); + } + + @Test + public void testMerge() throws JsonParseException, JsonMappingException, IOException { + InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json"); + ConfigContainer defaultConfig; + defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class); + //config example with ODF - queueConsumerWaitMs property value 777 + String value = "{\r\n\t\"odf\" : {\r\n\t\t\"discoveryServiceWatcherWaitMs\" : 777\r\n\t}\r\n}\r\n"; + ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class); + Utils.mergeODFPOJOs(defaultConfig, props); + + // TODOCONFIG, move next line to kafka tests + // Assert.assertEquals(777, defaultConfig.getOdf().getQueueConsumerWaitMs().intValue()); + } + + @Test + public void testDeepMerge() throws JsonParseException, JsonMappingException, IOException { + InputStream is = ODFConfigurationTest.class.getClassLoader().getResourceAsStream("org/apache/atlas/odf/core/test/internal/odf-initial-configuration.json"); + ConfigContainer defaultConfig; + defaultConfig = new ObjectMapper().readValue(is, ConfigContainer.class); + //config example with ODF - kafkaConsumer - offsetsStorage property value TEST. All other values for the kafkaConsumer should stay the same! + String value = "{\r\n\t\"odf\" : {\r\n\"messagingConfiguration\": { \"type\": \"" + KafkaMessagingConfiguration.class.getName() + + "\", \t\t\"kafkaConsumerConfig\" : { \r\n\t\t\t\"offsetsStorage\" : \"TEST\"\r\n\t\t}\r\n\t}\r\n}}\r\n"; + ConfigContainer props = new ObjectMapper().readValue(value, ConfigContainer.class); + Utils.mergeODFPOJOs(defaultConfig, props); + + // TODOCONFIG + // Assert.assertEquals("TEST", defaultConfig.getOdf().getKafkaConsumerConfig().getOffsetsStorage()); + //make sure the rest is still default + // Assert.assertEquals(400, defaultConfig.getOdf().getKafkaConsumerConfig().getZookeeperSessionTimeoutMs().intValue()); + } + + @Test + public void testGet() { + Assert.assertTrue(new ODFFactory().create().getSettingsManager().getODFSettings().isReuseRequests()); + } + + @Test + public void testPut() throws InterruptedException, IOException, ValidationException, JSONException, ServiceNotFoundException { + SettingsManager config = new ODFFactory().create().getSettingsManager(); + String propertyId = "my_dummy_test_property"; + int testNumber = 123; + Map<String, Object> cont = config.getUserDefinedConfig(); + cont.put(propertyId, testNumber); + config.updateUserDefined(cont); + Assert.assertEquals(testNumber, config.getUserDefinedConfig().get(propertyId)); + + String testString = "test"; + cont.put(propertyId, testString); + config.updateUserDefined(cont); + + Assert.assertEquals(testString, config.getUserDefinedConfig().get(propertyId)); + + JSONObject testJson = new JSONObject(); + testJson.put("testProp", "test"); + cont.put(propertyId, testJson); + config.updateUserDefined(cont); + + Assert.assertEquals(testJson, config.getUserDefinedConfig().get(propertyId)); + + ODFSettings settings = config.getODFSettings(); + logger.info("Last update object: " + JSONUtils.toJSON(settings)); + Assert.assertNotNull(settings); + Assert.assertNotNull(settings.getUserDefined()); + Assert.assertNotNull(settings.getUserDefined().get(propertyId)); + logger.info("User defined object: " + settings.getUserDefined().get(propertyId).getClass()); + @SuppressWarnings("unchecked") + Map<String, Object> notifiedNestedJSON = (Map<String, Object>) settings.getUserDefined().get(propertyId); + Assert.assertNotNull(notifiedNestedJSON.get("testProp")); + Assert.assertTrue(notifiedNestedJSON.get("testProp") instanceof String); + Assert.assertEquals("test", notifiedNestedJSON.get("testProp")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java new file mode 100755 index 0000000..aea9a30 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/PasswordEncryptionTest.java @@ -0,0 +1,83 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.configuration; + +import java.util.logging.Logger; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.core.Encryption; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.api.settings.SparkConfig; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.TimerTestBase; +import org.apache.atlas.odf.json.JSONUtils; + +public class PasswordEncryptionTest extends TimerTestBase { + Logger logger = ODFTestLogger.get(); + private static final String SPARK_PASSWORD_CONFIG = "spark.authenticate.secret"; + + @Test + public void testGeneralPasswordEncryption() throws Exception { + SettingsManager settings = new ODFFactory().create().getSettingsManager(); + ODFSettings settingsWithPlainPasswords = settings.getODFSettingsHidePasswords(); + settingsWithPlainPasswords.setOdfPassword("newOdfPassword"); + logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords)); + settings.updateODFSettings(settingsWithPlainPasswords); + + ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords(); + String hiddenPasswordIdentifyier = "***hidden***"; + Assert.assertEquals(hiddenPasswordIdentifyier, settingsWithHiddenPasswords.getOdfPassword()); + logger.info("Settings with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords)); + + ODFSettings settingsWithEncryptedPassword = settings.getODFSettings(); + Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword())); + logger.info("Settings with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword)); + + // When overwriting settings with hidden passwords, encrypted passwords must be kept internally + settings.updateODFSettings(settingsWithHiddenPasswords); + settingsWithEncryptedPassword = settings.getODFSettings(); + Assert.assertEquals("newOdfPassword", Encryption.decryptText(settingsWithEncryptedPassword.getOdfPassword())); + } + + @Test + public void testSparkConfigEncryption() throws Exception { + SettingsManager settings = new ODFFactory().create().getSettingsManager(); + SparkConfig plainSparkConfig = new SparkConfig(); + plainSparkConfig.setConfig(SPARK_PASSWORD_CONFIG, "plainConfigValue"); + ODFSettings settingsWithPlainPasswords = settings.getODFSettings(); + settingsWithPlainPasswords.setSparkConfig(plainSparkConfig);; + logger.info("Settings with plain password: " + JSONUtils.toJSON(settingsWithPlainPasswords)); + settings.updateODFSettings(settingsWithPlainPasswords); + + ODFSettings settingsWithHiddenPasswords = settings.getODFSettingsHidePasswords(); + String hiddenPasswordIdentifyier = "***hidden***"; + String hiddenConfigValue = (String) settingsWithHiddenPasswords.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG); + Assert.assertEquals(hiddenPasswordIdentifyier, hiddenConfigValue); + logger.info("Config with hidden password: " + JSONUtils.toJSON(settingsWithHiddenPasswords)); + + ODFSettings settingsWithEncryptedPassword = settings.getODFSettings(); + String encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG); + Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue)); + logger.info("Config with encrypted password: " + JSONUtils.toJSON(settingsWithEncryptedPassword)); + + // When overwriting settings with hidden passwords, encrypted passwords must be kept internally + settings.updateODFSettings(settingsWithHiddenPasswords); + encryptedConfigValue = (String) settingsWithEncryptedPassword.getSparkConfig().getConfigs().get(SPARK_PASSWORD_CONFIG); + Assert.assertEquals("plainConfigValue", Encryption.decryptText(encryptedConfigValue)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java new file mode 100755 index 0000000..3db5778 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/configuration/ValidationTests.java @@ -0,0 +1,103 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.configuration; + +import java.util.Collections; + +import org.apache.atlas.odf.api.settings.validation.EnumValidator; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.configuration.ConfigContainer; +import org.apache.atlas.odf.core.configuration.ConfigManager; +import org.apache.atlas.odf.core.configuration.ServiceValidator; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.settings.validation.ImplementationValidator; +import org.apache.atlas.odf.api.settings.validation.NumberPositiveValidator; +import org.apache.atlas.odf.api.settings.validation.PropertyValidator; +import org.apache.atlas.odf.core.test.TimerTestBase; +import org.apache.atlas.odf.core.test.discoveryservice.TestAsyncDiscoveryServiceWritingAnnotations1; +import org.apache.atlas.odf.json.JSONUtils; + +public class ValidationTests extends TimerTestBase { + + @Test + public void testEnum() { + String[] vals = new String[] { "test", "test2" }; + String correct = "test"; + String incorrect = "fail"; + + Assert.assertTrue(validateTest(correct, new EnumValidator(vals))); + Assert.assertFalse(validateTest(incorrect, new EnumValidator(vals))); + } + + @Test + public void testImplementation() { + String correct = TestAsyncDiscoveryServiceWritingAnnotations1.class.getName(); + String incorrect = "dummyClass"; + Assert.assertTrue(validateTest(correct, new ImplementationValidator())); + Assert.assertFalse(validateTest(incorrect, new ImplementationValidator())); + } + + @Test + public void testService() throws Exception { + String s = "{\r\n" + + " \"id\": \"asynctestservice\",\r\n" + + " \"name\": \"Async test\",\r\n" + + " \"description\": \"The async test service\",\r\n" + + " \"endpoint\": {\r\n" + + " \"runtimeName\": \"Java\",\r\n" + + " \"className\": \"TestAsyncDiscoveryService1\"\r\n" + + " }\r\n" + + " }"; + + DiscoveryServiceProperties newService = JSONUtils.fromJSON(s, DiscoveryServiceProperties.class); + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + //ODFConfig odfConfig = new ODFFactory().create(ODFConfiguration.class).getODFConfig(); + + ConfigContainer new1 = new ConfigContainer(); + new1.setRegisteredServices(Collections.singletonList(newService)); + ConfigManager configManager = new ODFInternalFactory().create(ConfigManager.class); + configManager.updateConfigContainer(new1); + + DiscoveryServiceProperties correct = discoveryServicesManager.getDiscoveryServicesProperties().get(0); + Assert.assertEquals("asynctestservice", correct.getId()); + correct.setId("newId"); + DiscoveryServiceProperties incorrect = new DiscoveryServiceProperties(); + Assert.assertTrue(validateTest(correct, new ServiceValidator())); + Assert.assertFalse(validateTest(incorrect, new ServiceValidator())); + } + + @Test + public void testNumber() { + int correct = 5; + int incorrect = -5; + Assert.assertTrue(validateTest(correct, new NumberPositiveValidator())); + Assert.assertFalse(validateTest(incorrect, new NumberPositiveValidator())); + } + + private boolean validateTest(Object value, PropertyValidator validator) { + try { + validator.validate(null, value); + return true; + } catch (ValidationException ex) { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java new file mode 100755 index 0000000..4fa2eda --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisProcessingTests.java @@ -0,0 +1,139 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import java.text.MessageFormat; +import java.util.Arrays; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisCancelResult; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.settings.MessagingConfiguration; +import org.apache.atlas.odf.api.settings.ODFSettings; +import org.apache.atlas.odf.api.settings.SettingsManager; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; +import org.apache.atlas.odf.json.JSONUtils; + +public class AnalysisProcessingTests extends ODFTestcase { + Logger logger = ODFTestLogger.get(); + + @Test + public void testAnalysisProcessingAfterShutdown() throws Exception { + final SettingsManager config = new ODFFactory().create().getSettingsManager(); + final ODFSettings odfSettings = config.getODFSettings(); + final MessagingConfiguration messagingConfiguration = odfSettings.getMessagingConfiguration(); + final Long origRequestRetentionMs = messagingConfiguration.getAnalysisRequestRetentionMs(); + messagingConfiguration.setAnalysisRequestRetentionMs(300000l); + config.updateODFSettings(odfSettings); + + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null); + AnalysisRequest req = tracker.getRequest(); + req.setDiscoveryServiceSequence(Arrays.asList("asynctestservice")); + req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset"); + final AnalysisResponse startRequest = cc.startRequest(req); + logger.info("Analysis :" + startRequest.getId()); + + Assert.assertNull(startRequest.getOriginalRequest()); + Assert.assertFalse(startRequest.isInvalidRequest()); + final AnalysisResponse duplicate = cc.startRequest(req); + Assert.assertNotNull(duplicate.getOriginalRequest()); + Assert.assertEquals(startRequest.getId(), duplicate.getId()); + logger.info("Analysis1 duplciate :" + duplicate.getId()); + + final AnalysisCancelResult cancelRequest = cc.cancelRequest(startRequest.getId()); + Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState()); + + cc.getQueueManager().stop(); + + AnalysisResponse response2 = cc.startRequest(req); + logger.info("Analysis2:" + response2.getId()); + AnalysisRequestStatus requestStatus = cc.getRequestStatus(response2.getId()); + int maxWait = 20; + + int currentWait = 0; + while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.ACTIVE) { + Thread.sleep(100); + currentWait++; + requestStatus = cc.getRequestStatus(response2.getId()); + } + logger.info("THREAD ACTIVE, KILL IT!"); + + cc.getQueueManager().start(); + logger.info("restarted"); + Assert.assertNull(response2.getOriginalRequest()); + Assert.assertFalse(response2.isInvalidRequest()); + + messagingConfiguration.setAnalysisRequestRetentionMs(origRequestRetentionMs); + config.updateODFSettings(odfSettings); + + currentWait = 0; + while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) { + Thread.sleep(100); + requestStatus = cc.getRequestStatus(response2.getId()); + } + Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState()); + } + + @Test + public void testRequestWithAnnotationTypes() throws Exception { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null); + AnalysisRequest req = tracker.getRequest(); + req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset"); + List<String> annotationTypes = Arrays.asList(new String[] { "AsyncTestDummyAnnotation" }); + req.setAnnotationTypes(annotationTypes); + logger.info(MessageFormat.format("Running discovery request for annotation type {0}.", annotationTypes)); + AnalysisResponse resp = cc.startRequest(req); + logger.info(MessageFormat.format("Started request id {0}.", resp.getId())); + Assert.assertNotNull(resp.getId()); + Assert.assertFalse(resp.isInvalidRequest()); + + int currentWait = 0; + int maxWait = 20; + AnalysisRequestStatus requestStatus = cc.getRequestStatus(resp.getId()); + while (currentWait < maxWait && requestStatus.getState() != AnalysisRequestStatus.State.FINISHED) { + Thread.sleep(100); + requestStatus = cc.getRequestStatus(resp.getId()); + } + Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, requestStatus.getState()); + Assert.assertEquals("Generated service has incorrect number of elements.", 1, requestStatus.getRequest().getDiscoveryServiceSequence().size()); + Assert.assertEquals("Generated service sequence differs from expected value.", "asynctestservice", requestStatus.getRequest().getDiscoveryServiceSequence().get(0)); + } + + @Test + public void testRequestWithMissingAnnotationTypes() throws Exception { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisRequestTracker tracker = JSONUtils.readJSONObjectFromFileInClasspath(AnalysisRequestTracker.class, "org/apache/atlas/odf/core/test/messaging/kafka/tracker1.json", null); + AnalysisRequest req = tracker.getRequest(); + req.getDataSets().get(0).setId(ODFAPITest.DUMMY_SUCCESS_ID + "_dataset"); + List<String> annotationTypes = Arrays.asList(new String[] { "noServiceExistsForThisAnnotationType" }); + req.setAnnotationTypes(annotationTypes); + logger.info(MessageFormat.format("Running discovery request for non-existing annotation type {0}.", annotationTypes)); + AnalysisResponse resp = cc.startRequest(req); + Assert.assertTrue(resp.isInvalidRequest()); + Assert.assertEquals("Unexpected error message.", "No suitable discovery services found to create the requested annotation types.", resp.getDetails()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java new file mode 100755 index 0000000..fd39e15 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestCancellationTest.java @@ -0,0 +1,104 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import java.util.Collections; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisCancelResult; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.test.ODFTestcase; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.core.test.ODFTestLogger; + +public class AnalysisRequestCancellationTest extends ODFTestcase { + + Logger logger = ODFTestLogger.get(); + + AnalysisRequestTracker generateTracker(String id, STATUS status) { + AnalysisRequestTracker tracker = new AnalysisRequestTracker(); + Utils.setCurrentTimeAsLastModified(tracker); + tracker.setNextDiscoveryServiceRequest(0); + AnalysisRequest req = new AnalysisRequest(); + req.setId(id); + MetaDataObjectReference ref = new MetaDataObjectReference(); + ref.setId("DataSet" + id); + req.setDataSets(Collections.singletonList(ref)); + tracker.setRequest(req); + tracker.setStatus(status); + return tracker; + } + + @Test + public void testRequestCancellationNotFoundFailure() { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisCancelResult cancelRequest = cc.cancelRequest("dummy_id"); + Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.NOT_FOUND); + } + + @Test + public void testRequestCancellationWrongStateFailure() { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class); + String testId = "test_id1"; + AnalysisRequestTracker tracker = null; + AnalysisCancelResult cancelRequest = null; + + tracker = generateTracker(testId, STATUS.FINISHED); + store.store(tracker); + cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE); + + tracker = generateTracker(testId, STATUS.ERROR); + store.store(tracker); + cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE); + + tracker = generateTracker(testId, STATUS.CANCELLED); + store.store(tracker); + cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(cancelRequest.getState(), AnalysisCancelResult.State.INVALID_STATE); + } + + @Test + public void testRequestCancellationSuccess() { + ControlCenter cc = new ODFInternalFactory().create(ControlCenter.class); + AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class); + String testId = "test_id2"; + + AnalysisRequestTracker tracker = generateTracker(testId, STATUS.INITIALIZED); + store.store(tracker); + AnalysisCancelResult cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState()); + + tracker = generateTracker(testId, STATUS.IN_DISCOVERY_SERVICE_QUEUE); + store.store(tracker); + cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState()); + + tracker = generateTracker(testId, STATUS.DISCOVERY_SERVICE_RUNNING); + store.store(tracker); + cancelRequest = cc.cancelRequest(testId); + Assert.assertEquals(AnalysisCancelResult.State.SUCCESS, cancelRequest.getState()); +} +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java new file mode 100755 index 0000000..7eb46d8 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/AnalysisRequestTrackerStoreTest.java @@ -0,0 +1,105 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.junit.Test; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; + +public class AnalysisRequestTrackerStoreTest extends ODFTestcase { + + Logger logger = ODFTestLogger.get(); + + AnalysisRequestTracker generateTracker(String id, STATUS status) { + AnalysisRequestTracker tracker = new AnalysisRequestTracker(); + Utils.setCurrentTimeAsLastModified(tracker); + tracker.setNextDiscoveryServiceRequest(0); + AnalysisRequest req = new AnalysisRequest(); + req.setId(id); + MetaDataObjectReference ref = new MetaDataObjectReference(); + ref.setId("DataSet" + id); + req.setDataSets(Collections.singletonList(ref)); + tracker.setRequest(req); + tracker.setStatus(status); + return tracker; + } + + @Test + public void testStore() throws Exception { + AnalysisRequestTrackerStore store = (new ODFInternalFactory()).create(AnalysisRequestTrackerStore.class); + assertNotNull(store); + int MAX_TRACKERS = 50; + List<AnalysisRequestTracker> trackers1 = new ArrayList<AnalysisRequestTracker>(); + STATUS lastStatus = STATUS.IN_DISCOVERY_SERVICE_QUEUE; + for (int i = 0; i < MAX_TRACKERS; i++) { + trackers1.add(generateTracker("STORETEST_ID" + i, lastStatus)); + } + + logger.info("Storing " + MAX_TRACKERS + " Trackers"); + long pass1Start = System.currentTimeMillis(); + for (AnalysisRequestTracker tracker : trackers1) { + store.store(tracker); + } + long pass1End = System.currentTimeMillis(); + + logger.info("Storing " + MAX_TRACKERS + " Trackers again with new status"); + + lastStatus = STATUS.FINISHED; + List<AnalysisRequestTracker> trackers2 = new ArrayList<AnalysisRequestTracker>(); + for (int i = 0; i < MAX_TRACKERS; i++) { + trackers2.add(generateTracker("STORETEST_ID" + i, lastStatus)); + } + long pass2Start = System.currentTimeMillis(); + for (AnalysisRequestTracker tracker : trackers2) { + store.store(tracker); + } + long pass2End = System.currentTimeMillis(); + + Thread.sleep(2000); + logger.info("Querying and checking " + MAX_TRACKERS + " Trackers"); + + long queryStart = System.currentTimeMillis(); + + for (int i = 0; i < MAX_TRACKERS; i++) { + final String analysisRequestId = "STORETEST_ID" + i; + AnalysisRequestTracker tracker = store.query(analysisRequestId); + assertNotNull(tracker); + assertEquals(1, tracker.getRequest().getDataSets().size()); + MetaDataObjectReference ref = new MetaDataObjectReference(); + ref.setId("DataSet" + analysisRequestId); + assertEquals(tracker.getRequest().getDataSets().get(0), ref); + assertEquals(lastStatus, tracker.getStatus()); + } + long queryEnd = System.currentTimeMillis(); + + System.out.println("First pass: " + (pass1End - pass1Start) + "ms, second pass: " + (pass2End - pass2Start) + "ms, query: " + (queryEnd - queryStart) + "ms"); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java new file mode 100755 index 0000000..347fb84 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DeclarativeRequestMapperTest.java @@ -0,0 +1,158 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.controlcenter.DeclarativeRequestMapper; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.controlcenter.ControlCenter; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; + +public class DeclarativeRequestMapperTest extends ODFTestBase { + final private static String SERVICE_CLASSNAME = "TestAsyncDiscoveryService1"; + final private static String[] EXPECTED_SERVICE_SEQUENCES = new String[] { "pre3,ser1", "alt1,ser1", "pre4,pre1,ser1", + "pre3,ser1,ser3", "pre3,ser1,ser5", "alt1,ser1,ser3", "alt1,ser1,ser5", "pre3,pre2,ser4", "alt1,pre2,ser4", + "pre4,pre1,ser1,ser3", "pre4,pre1,ser1,ser5", "pre3,ser1,alt1,ser3", "pre3,ser1,pre2,ser4", "pre3,ser1,alt1,ser5" }; + private Logger logger = Logger.getLogger(ControlCenter.class.getName()); + + private static void createDiscoveryService(String serviceId, String[] resultingAnnotationTypes, String[] prerequisiteAnnotationTypes, String[] supportedObjectTypes) throws ValidationException, JSONException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties(); + DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint(); + dse.setClassName(SERVICE_CLASSNAME); + dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class)); + dsProperties.setId(serviceId); + dsProperties.setName(serviceId + " Discovery Service"); + dsProperties.setPrerequisiteAnnotationTypes(Arrays.asList(prerequisiteAnnotationTypes)); + dsProperties.setResultingAnnotationTypes(Arrays.asList(resultingAnnotationTypes)); + dsProperties.setSupportedObjectTypes(Arrays.asList(supportedObjectTypes)); + discoveryServicesManager.createDiscoveryService(dsProperties); + } + + private void deleteDiscoveryService(String serviceId, boolean failOnError) throws ValidationException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + try { + discoveryServicesManager.deleteDiscoveryService(serviceId); + } + catch (ServiceNotFoundException e) { + if (failOnError) { + Assert.fail("Error deleting discovery services."); + } + } + } + + private void deleteDiscoveryServices(boolean failOnError) throws ValidationException { + List<String> serviceIds = Arrays.asList(new String[] { "ser1", "ser2", "ser3", "ser4", "ser5", "pre1", "pre2", "pre3", "pre4", "alt1" }); + for (String serviceId : serviceIds) { + deleteDiscoveryService(serviceId, failOnError); + } + } + + private void createDiscoveryServices() throws ValidationException, JSONException { + createDiscoveryService("ser1", new String[] { "an1", "com1", "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" }); + createDiscoveryService("ser2", new String[] { "an2", "com1" }, new String[] { "pre2" }, new String[] { "Table", "DataFile" }); + createDiscoveryService("ser3", new String[] { "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" }); + createDiscoveryService("ser4", new String[] { "an1", "com1", "com2" }, new String[] { "pre1", "pre2" }, new String[] { "Table", "DataFile" }); + createDiscoveryService("ser5", new String[] { "com1", "com2" }, new String[] { "pre1" }, new String[] { "Table", "DataFile" }); + + createDiscoveryService("pre1", new String[] { "pre1" }, new String[] { "pre4" }, new String[] { "Table", "DataFile" }); + createDiscoveryService("pre2", new String[] { "pre2" }, new String[] { }, new String[] { "Table", "DataFile" }); + createDiscoveryService("pre3", new String[] { "pre1" }, new String[] { }, new String[] { "Table", "DataFile" }); + createDiscoveryService("pre4", new String[] { "pre4" }, new String[] { }, new String[] { "Table", "DataFile" }); + + createDiscoveryService("alt1", new String[] { "pre1" }, new String[] { }, new String[] { "Table", "DataFile" }); + } + + @Test + public void testDiscoveryServiceSequences() throws Exception { + deleteDiscoveryServices(false); + createDiscoveryServices(); + + AnalysisRequest request = new AnalysisRequest(); + request.setAnnotationTypes(Arrays.asList( new String[] { "an1", "com2" })); + DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request); + logger.log(Level.INFO, "Printing list of mapper result to stdout."); + int i = 0; + for (DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : mapper.getDiscoveryServiceSequences()) { + String sequence = Utils.joinStrings(new ArrayList<String>(discoveryApproach.getServiceSequence()), ','); + System.out.println(sequence); + if (i < EXPECTED_SERVICE_SEQUENCES.length) { + Assert.assertTrue(sequence.equals(EXPECTED_SERVICE_SEQUENCES[i++])); + } + } + Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 36, mapper.getDiscoveryServiceSequences().size()); + + deleteDiscoveryServices(true); + } + + @Test + public void testRecommendedDiscoveryServiceSequence() throws Exception { + deleteDiscoveryServices(false); + createDiscoveryServices(); + + AnalysisRequest request = new AnalysisRequest(); + request.setAnnotationTypes(Arrays.asList( new String[] { "com2", "pre4" })); + DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request); + Assert.assertEquals("Recommended sequence does not match expected string.", "pre4,pre1,ser1", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ',')); + + deleteDiscoveryServices(true); + } + + @Test + public void testRemoveFailingService() throws Exception { + deleteDiscoveryServices(false); + createDiscoveryServices(); + + AnalysisRequest request = new AnalysisRequest(); + request.setAnnotationTypes(Arrays.asList(new String[] { "an1", "com2" })); + DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request); + Assert.assertEquals("Original sequence does not match expected string.", EXPECTED_SERVICE_SEQUENCES[0], Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ',')); + + mapper.removeDiscoveryServiceSequences("ser1"); + Assert.assertEquals("Updated sequence does not match expected string.", "pre3,pre2,ser4", Utils.joinStrings(mapper.getRecommendedDiscoveryServiceSequence(), ',')); + + deleteDiscoveryServices(true); + } + + @Test + public void testRequestWithManyAnnotationTypes() throws Exception { + deleteDiscoveryServices(false); + createDiscoveryServices(); + + AnalysisRequest request = new AnalysisRequest(); + request.setAnnotationTypes(Arrays.asList(new String[] { "an1", "an2", "com1", "com2", "pre1", "pre2", "pre4" })); + DeclarativeRequestMapper mapper = new DeclarativeRequestMapper(request); + Assert.assertEquals("Number of calculated discovery service sequences does not match expected value.", 75, mapper.getDiscoveryServiceSequences().size()); + + deleteDiscoveryServices(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java new file mode 100755 index 0000000..96a4fee --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/DefaultThreadManagerTest.java @@ -0,0 +1,172 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.engine.ThreadStatus; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory; +import org.apache.atlas.odf.core.controlcenter.ODFRunnable; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.TimerTestBase; + +public class DefaultThreadManagerTest extends TimerTestBase { + + int threadMS = 100; + int waitMS = 5000; + + Logger logger = ODFTestLogger.get(); + + class TestRunnable implements ODFRunnable { + + String id; + boolean cancelled = false; + long msToWaitBeforeFinish; + + public TestRunnable(String id, long msToWaitBeforeFinish) { + this.id = id; + this.msToWaitBeforeFinish = msToWaitBeforeFinish; + } + + public TestRunnable(String id) { + this(id, threadMS); + } + + @Override + public void run() { + logger.info("Starting thread with ID: " + id); + try { + Thread.sleep(msToWaitBeforeFinish); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + logger.info("Thread finished with ID: " + id); + + } + + @Override + public void setExecutorService(ExecutorService service) { + // TODO Auto-generated method stub + + } + + @Override + public void cancel() { + cancelled = true; + } + + @Override + public boolean isReady() { + return true; + } + + } + + @Test + public void testSimple() throws Exception { + ODFInternalFactory f = new ODFInternalFactory(); + ThreadManager tm = f.create(ThreadManager.class); + tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService()); + assertNotNull(tm); + + String id1 = "id1"; + String id2 = "id2"; + + // start id1 + ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id1); + Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st); + + boolean b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated(); + assertTrue(b); + b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated(); + assertFalse(b); + + st = tm.getStateOfUnmanagedThread(id1); + Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st); + + // start id2 + st = tm.getStateOfUnmanagedThread(id2); + Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st); + + b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated(); + assertTrue(b); + b = tm.startUnmanagedThread(id2, new TestRunnable(id2)).isNewThreadCreated(); + assertFalse(b); + + Thread.sleep(waitMS); + st = tm.getStateOfUnmanagedThread(id1); + Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st); + b = tm.startUnmanagedThread(id1, new TestRunnable(id1)).isNewThreadCreated(); + assertTrue(b); + + st = tm.getStateOfUnmanagedThread(id2); + // id2 should be removed from thread list + Assert.assertTrue(ThreadStatus.ThreadState.FINISHED.equals(st) || ThreadStatus.ThreadState.NON_EXISTENT.equals(st)); + + tm.shutdownThreads(Arrays.asList("id1", "id2")); + } + + @Test + public void testManyThreads() throws Exception { + ODFInternalFactory f = new ODFInternalFactory(); + ThreadManager tm = f.create(ThreadManager.class); + tm.setExecutorService(f.create(ExecutorServiceFactory.class).createExecutorService()); + + assertNotNull(tm); + + List<String> threadIds = new ArrayList<>(); + int THREAD_NUM = 20; + for (int i = 0; i < THREAD_NUM; i++) { + String id = "ThreadID" + i; + threadIds.add(id); + ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id); + Assert.assertEquals(ThreadStatus.ThreadState.NON_EXISTENT, st); + + boolean b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated(); + assertTrue(b); + b = tm.startUnmanagedThread(id, new TestRunnable(id)).isNewThreadCreated(); + assertFalse(b); + + st = tm.getStateOfUnmanagedThread(id); + Assert.assertEquals(ThreadStatus.ThreadState.RUNNING, st); + + } + logger.info("All threads scheduled"); + + Thread.sleep(waitMS); + + for (int i = 0; i < THREAD_NUM; i++) { + String id = "ThreadID" + i; + ThreadStatus.ThreadState st = tm.getStateOfUnmanagedThread(id); + Assert.assertEquals(ThreadStatus.ThreadState.FINISHED, st); + } + tm.shutdownThreads(threadIds); + + } + +}
