http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java new file mode 100755 index 0000000..900c214 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ODFAPITest.java @@ -0,0 +1,373 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.atlas.odf.api.metadata.models.DataSet; +import org.apache.atlas.odf.api.metadata.models.UnknownDataSet; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore; +import org.apache.atlas.odf.core.metadata.DefaultMetadataStore; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisCancelResult; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; + +public class ODFAPITest extends ODFTestBase { + + public static int WAIT_MS_BETWEEN_POLLING = 500; + public static int MAX_NUMBER_OF_POLLS = 500; + public static String DUMMY_SUCCESS_ID = "success"; + public static String DUMMY_ERROR_ID = "error"; + + public static void runRequestAndCheckResult(String dataSetID, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{ + runRequestAndCheckResult(Collections.singletonList(dataSetID), expectedState, expectedProcessedDiscoveryRequests); + } + + public static void runRequestAndCheckResult(List<String> dataSetIDs, AnalysisRequestStatus.State expectedState, int expectedProcessedDiscoveryRequests) throws Exception{ + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + String id = runRequest(dataSetIDs, analysisManager); + log.info("Running request "+id+" on data sets: " + dataSetIDs); + AnalysisRequestStatus status = null; + + int maxPolls = MAX_NUMBER_OF_POLLS; + do { + status = analysisManager.getAnalysisRequestStatus(id); + log.log(Level.INFO, "{4}th poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(), + expectedState, (MAX_NUMBER_OF_POLLS-maxPolls) }); + maxPolls--; + Thread.sleep(WAIT_MS_BETWEEN_POLLING); + } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND)); + + log.log(Level.INFO, "Polling result after {0} polls for request id {1}: status: {2}", new Object[] {(MAX_NUMBER_OF_POLLS-maxPolls), id, status.getState()}); + + Assert.assertTrue(maxPolls > 0); + Assert.assertEquals(expectedState, status.getState()); + AnalysisRequestTrackerStore store = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class); + AnalysisRequestTracker tracker = store.query(id); + Assert.assertNotNull(tracker); + checkTracker(tracker, expectedProcessedDiscoveryRequests); + log.info("Status details: " + status.getDetails()); + } + + static void checkTracker(AnalysisRequestTracker tracker, int expectedProcessedDiscoveryRequests) { + if (expectedProcessedDiscoveryRequests == -1) { + expectedProcessedDiscoveryRequests = tracker.getDiscoveryServiceRequests().size(); + } + Assert.assertEquals(expectedProcessedDiscoveryRequests, tracker.getDiscoveryServiceResponses().size()); + + } + + static String runRequest(String dataSetID, AnalysisManager analysisManager) throws Exception { + return runRequest(Collections.singletonList(dataSetID), analysisManager); + } + + public static String runRequest(List<String> dataSetIDs, AnalysisManager analysisManager) throws Exception { + AnalysisRequest request = createAnalysisRequest(dataSetIDs); + log.info("Starting analyis"); + AnalysisResponse response = analysisManager.runAnalysis(request); + Assert.assertNotNull(response); + Assert.assertFalse(response.isInvalidRequest()); + String id = response.getId(); + Assert.assertNotNull(id); + return id; + } + + + @Test + public void testSimpleSuccess() throws Exception { + runRequestAndCheckResult("successID", AnalysisRequestStatus.State.FINISHED, -1); + } + + public static void waitForRequest(String requestId, AnalysisManager analysisManager) { + waitForRequest(requestId, analysisManager, MAX_NUMBER_OF_POLLS); + } + + public static void waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls) { + AnalysisRequestStatus status = null; + + log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId); + do { + status = analysisManager.getAnalysisRequestStatus(requestId); + + log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() }); + maxPolls--; + try { + Thread.sleep(WAIT_MS_BETWEEN_POLLING); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND)); + if (maxPolls == 0) { + log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId); + } + log.log(Level.INFO, "Request ''{0}'' is finished with state: ''{1}''", new Object[] { requestId, status.getState() }); + } + + public static boolean waitForRequest(String requestId, AnalysisManager analysisManager, int maxPolls, AnalysisRequestStatus.State expectedState) { + AnalysisRequestStatus status = null; + + log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId); + do { + status = analysisManager.getAnalysisRequestStatus(requestId); + log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() }); + maxPolls--; + try { + Thread.sleep(WAIT_MS_BETWEEN_POLLING); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } while (maxPolls > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND)); + if (maxPolls == 0) { + log.log(Level.INFO, "Request ''{0}'' is not finished yet, don't wait for it", requestId); + } + return expectedState.equals(status.getState()); + } + + + @Test + public void testSimpleSuccessDuplicate() throws Exception { + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + String id = runRequest("successID", analysisManager); + String secondId = runRequest("successID", analysisManager); + Assert.assertNotEquals(id, secondId); + //Wait limit and try if new analysis is started + Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS*2 + 5000); + String thirdId = runRequest("successID", analysisManager); + Assert.assertNotEquals(secondId, thirdId); + waitForRequest(id, analysisManager); + waitForRequest(thirdId, analysisManager); + } + + @Test + public void testSimpleSuccessNoDuplicate() throws Exception { + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + String id = runRequest("successID", analysisManager); + String secondId = runRequest("successID2", analysisManager); + Assert.assertNotEquals(id, secondId); + waitForRequest(id, analysisManager); + waitForRequest(secondId, analysisManager); + } + + @Test + public void testSimpleSuccessDuplicateSubset() throws Exception { + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager); + String secondId = runRequest("successID2", analysisManager); + Assert.assertNotEquals(id, secondId); + Thread.sleep(DefaultStatusQueueStore.IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS + 5000); + String thirdId = runRequest("successID", analysisManager); + Assert.assertNotEquals(secondId, thirdId); + waitForRequest(id, analysisManager); + waitForRequest(thirdId, analysisManager); + } + + /** + * This test depends on the speed of execution. + * An analysis that is not in state INITIALIZED or IN_SERVICE_QUEUE cannot be cancelled. + * Therefore if the analysis is started too quickly this test will fail! + * + * Ignore for now as this can go wrong in the build. + */ + @Test + @Ignore + public void testCancelRequest() throws Exception { + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + String id = runRequest(Arrays.asList("successID", "successID2", "successID3"), analysisManager); + AnalysisCancelResult cancelAnalysisRequest = analysisManager.cancelAnalysisRequest(id); + Assert.assertEquals(cancelAnalysisRequest.getState(), AnalysisCancelResult.State.SUCCESS); + String secondId = runRequest("successID2", analysisManager); + Assert.assertNotEquals(id, secondId); + } + + + @Test + public void testRequestsWithDataSetListSuccess() throws Exception { + runRequestAndCheckResult(Arrays.asList("success1", "success2", "success3"), AnalysisRequestStatus.State.FINISHED, 6); + } + + @Test + public void testRequestsWithDataSetListError() throws Exception { + runRequestAndCheckResult(Arrays.asList("success1", "error2", "success3"), AnalysisRequestStatus.State.ERROR, 3); + } + + + + @Test + public void testSimpleFailure() throws Exception { + runRequestAndCheckResult("errorID", AnalysisRequestStatus.State.ERROR, 1); + } + + @Test + public void testManyRequests() throws Exception { + List<String> dataSets = new ArrayList<String>(); + List<AnalysisRequestStatus.State> expectedStates = new ArrayList<AnalysisRequestStatus.State>(); + int dataSetNum = 5; + for (int i=0; i<dataSetNum; i++) { + AnalysisRequestStatus.State expectedState = AnalysisRequestStatus.State.FINISHED; + String dataSet = "successdataSet" + i; + if (i % 3 == 0) { + // every third data set should fail + dataSet = "errorDataSet" + i; + expectedState = AnalysisRequestStatus.State.ERROR; + } + dataSets.add(dataSet); + expectedStates.add(expectedState); + } + + runRequests(dataSets, expectedStates); + } + + public void runRequests(List<String> dataSetIDs, List<AnalysisRequestStatus.State> expectedStates) throws Exception { + Assert.assertTrue(dataSetIDs.size() == expectedStates.size()); + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + + Map<AnalysisRequest, AnalysisRequestStatus.State> request2ExpectedState = new HashMap<AnalysisRequest, AnalysisRequestStatus.State>(); + + for (int i = 0; i < dataSetIDs.size(); i++) { + String dataSetID = dataSetIDs.get(i); + AnalysisRequestStatus.State expectedState = expectedStates.get(i); + + AnalysisRequest request = createAnalysisRequest(Collections.singletonList(dataSetID)); + + log.info("Starting analyis"); + AnalysisResponse response = analysisManager.runAnalysis(request); + Assert.assertNotNull(response); + String id = response.getId(); + Assert.assertFalse(response.isInvalidRequest()); + Assert.assertNotNull(id); + request.setId(id); + request2ExpectedState.put(request, expectedState); + } + + // Set<AnalysisRequest> finishedRequests = new HashSet<AnalysisRequest>(); + Map<AnalysisRequest, AnalysisRequestStatus> actualFinalStatePerRequest = new HashMap<AnalysisRequest, AnalysisRequestStatus>(); + int maxPollPasses = 10; + for (int i = 0; i < maxPollPasses; i++) { + log.info("Polling all requests for the " + i + " th time"); + boolean allRequestsFinished = true; + for (Map.Entry<AnalysisRequest, AnalysisRequestStatus.State> entry : request2ExpectedState.entrySet()) { + + AnalysisRequest request = entry.getKey(); + String id = request.getId(); + if (actualFinalStatePerRequest.containsKey(request)) { + log.log(Level.INFO, "Request with ID ''{0}'' already finished, skipping it", id); + } else { + allRequestsFinished = false; + + AnalysisRequestStatus.State expectedState = entry.getValue(); + + AnalysisRequestStatus status = null; + + int maxPollsPerRequest = 3; + do { + status = analysisManager.getAnalysisRequestStatus(id); + log.log(Level.INFO, "Poll request for request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", + new Object[] { id, status.getState(), status.getDetails(), expectedState }); + maxPollsPerRequest--; + Thread.sleep(1000); + } while (maxPollsPerRequest > 0 && (status.getState() == AnalysisRequestStatus.State.ACTIVE || status.getState() == AnalysisRequestStatus.State.QUEUED || status.getState() == AnalysisRequestStatus.State.NOT_FOUND)); + + if (maxPollsPerRequest > 0) { + // final state found + actualFinalStatePerRequest.put(request, status); + // Assert.assertEquals(expectedState, status.getState()); + } + } + } + if (allRequestsFinished) { + log.info("All requests finished"); + break; + } + } + Assert.assertTrue(actualFinalStatePerRequest.size() == request2ExpectedState.size()); + Assert.assertTrue(actualFinalStatePerRequest.keySet().equals(request2ExpectedState.keySet())); + for (Map.Entry<AnalysisRequest, AnalysisRequestStatus> actual : actualFinalStatePerRequest.entrySet()) { + AnalysisRequest req = actual.getKey(); + Assert.assertNotNull(req); + AnalysisRequestStatus.State expectedState = request2ExpectedState.get(req); + Assert.assertNotNull(expectedState); + AnalysisRequestStatus.State actualState = actual.getValue().getState(); + Assert.assertNotNull(actualState); + + log.log(Level.INFO, "Checking request ID ''{0}'', actual state: ''{1}'', expected state: ''{2}''", new Object[] { req.getId(), actualState, expectedState }); + Assert.assertNotNull(expectedState); + Assert.assertEquals(expectedState, actualState); + } + } + + public static AnalysisRequest createAnalysisRequest(List<String> dataSetIDs) throws JSONException { + AnalysisRequest request = new AnalysisRequest(); + List<MetaDataObjectReference> dataSetRefs = new ArrayList<>(); + MetadataStore mds = new ODFFactory().create().getMetadataStore(); + if (!(mds instanceof DefaultMetadataStore)) { + throw new RuntimeException(MessageFormat.format("This tests does not work with metadata store implementation \"{0}\" but only with the DefaultMetadataStore.", mds.getClass().getName())); + } + DefaultMetadataStore defaultMds = (DefaultMetadataStore) mds; + defaultMds.resetAllData(); + for (String id : dataSetIDs) { + MetaDataObjectReference mdr = new MetaDataObjectReference(); + mdr.setId(id); + dataSetRefs.add(mdr); + if (id.startsWith(DUMMY_SUCCESS_ID) || id.startsWith(DUMMY_ERROR_ID)) { + log.info("Creating dummy data set for reference : " + id.toString()); + DataSet ds = new UnknownDataSet(); + ds.setReference(mdr); + defaultMds.createObject(ds); + } + } + defaultMds.commit(); + request.setDataSets(dataSetRefs); + List<String> serviceIds = Arrays.asList(new String[]{"asynctestservice", "synctestservice"}); + /* use a fix list of services + List<DiscoveryServiceRegistrationInfo> registeredServices = new ODFFactory().create(ControlCenter.class).getConfig().getRegisteredServices(); + for(DiscoveryServiceRegistrationInfo service : registeredServices){ + serviceIds.add(service.getId()); + } + */ + request.setDiscoveryServiceSequence(serviceIds); + Map<String, Object> additionalProps = new HashMap<String, Object>(); + additionalProps.put("aaa", "bbb"); + JSONObject jo = new JSONObject(); + jo.put("p1", "v1"); + jo.put("p2", "v2"); + additionalProps.put("jo", jo); + request.setAdditionalProperties(additionalProps); + return request; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java new file mode 100755 index 0000000..9aa3ba4 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/ParallelODFTest.java @@ -0,0 +1,101 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.api.engine.EngineManager; +import org.apache.atlas.odf.api.engine.SystemHealth; +import org.apache.atlas.odf.api.engine.SystemHealth.HealthStatus; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.analysis.AnalysisManagerImpl; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.core.test.ODFTestcase; + +public class ParallelODFTest extends ODFTestcase { + Logger log = ODFTestLogger.get(); + + @Test + public void runDataSetsInParallelSuccess() throws Exception { + runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "successID2" }), State.FINISHED); + } + + @Test + public void runDataSetsInParallelError() throws Exception { + runDataSetsInParallelAndCheckResult(Arrays.asList(new String[] { "successID1", "errorID2" }), State.ERROR); + } + + private void runDataSetsInParallelAndCheckResult(List<String> dataSetIDs, State expectedState) throws Exception { + log.info("Running data sets in parallel: " + dataSetIDs); + log.info("Expected state: " + expectedState); + AnalysisRequest req = ODFAPITest.createAnalysisRequest(dataSetIDs); + // Enable parallel processing because this is a parallel test + req.setProcessDataSetsSequentially(false); + AnalysisManager analysisManager = new ODFFactory().create().getAnalysisManager(); + EngineManager engineManager = new ODFFactory().create().getEngineManager(); + + SystemHealth healthCheckResult = engineManager.checkHealthStatus(); + Assert.assertEquals(HealthStatus.OK, healthCheckResult.getStatus()); + AnalysisResponse resp = analysisManager.runAnalysis(req); + log.info("Parallel requests started"); + + String id = resp.getId(); + List<String> singleIds = Utils.splitString(id, AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR); + List<String> singleDetails = Utils.splitString(resp.getDetails(), AnalysisManagerImpl.COMPOUND_REQUEST_SEPARATOR); + Assert.assertEquals(dataSetIDs.size(), singleIds.size()); + Assert.assertEquals(dataSetIDs.size(), singleDetails.size()); + + AnalysisRequestStatus status = null; + + // check that requests are processed in parallel: + // there must be a point in time where both requests are in status "active" + log.info("Polling for status of parallel request..."); + boolean foundPointInTimeWhereBothRequestsAreActive = false; + int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS; + do { + List<State> allSingleStates = new ArrayList<AnalysisRequestStatus.State>(); + for (String singleId : singleIds) { + allSingleStates.add(analysisManager.getAnalysisRequestStatus(singleId).getState()); + } + if (Utils.containsOnly(allSingleStates, new State[] { State.ACTIVE })) { + foundPointInTimeWhereBothRequestsAreActive = true; + } + + status = analysisManager.getAnalysisRequestStatus(id); + log.log(Level.INFO, "Poll request for parallel request ID ''{0}'' (expected state: ''{3}''): state: ''{1}'', details: ''{2}''", new Object[] { id, status.getState(), status.getDetails(), + expectedState }); + log.info("States of single requests: " + singleIds + ": " + allSingleStates); + maxPolls--; + Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING); + } while (maxPolls > 0 && (status.getState() == State.ACTIVE || status.getState() == State.QUEUED)); + + Assert.assertTrue(maxPolls > 0); + Assert.assertEquals(expectedState, status.getState()); + Assert.assertTrue(foundPointInTimeWhereBothRequestsAreActive); + log.info("Parallel request status details: " + status.getDetails()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java new file mode 100755 index 0000000..9a43b78 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/controlcenter/SetTrackerStatusTest.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.controlcenter; + +import java.util.logging.Level; + +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisManager; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.controlcenter.AnalysisRequestTrackerStore; +import org.apache.atlas.odf.core.test.ODFTestBase; + +public class SetTrackerStatusTest extends ODFTestBase { + + @Test + public void testSetTrackerStatus() throws Exception { + AnalysisManager am = new ODFFactory().create().getAnalysisManager(); + AnalysisRequestTrackerStore arts = new ODFInternalFactory().create(AnalysisRequestTrackerStore.class); + String requestId = ODFAPITest.runRequest("successId", am); + Thread.sleep(1000); + long cutOffTimestamp = System.currentTimeMillis(); + String testMessage = "Message was set to error at " + cutOffTimestamp; + arts.setStatusOfOldRequest(cutOffTimestamp, STATUS.ERROR, testMessage); + AnalysisRequestTracker tracker = arts.query(requestId); + Assert.assertEquals(STATUS.ERROR, tracker.getStatus()); + Assert.assertEquals(testMessage, tracker.getStatusDetails()); + + // wait until request is finished and state is set back to finished + log.log(Level.INFO, "Waiting for request ''{0}'' to finish", requestId); + int maxPolls = ODFAPITest.MAX_NUMBER_OF_POLLS; + AnalysisRequestStatus status = null; + do { + status = am.getAnalysisRequestStatus(requestId); + log.log(Level.INFO, "Poll request for request ID ''{0}'', state: ''{1}'', details: ''{2}''", new Object[] { requestId, status.getState(), status.getDetails() }); + maxPolls--; + try { + Thread.sleep(ODFAPITest.WAIT_MS_BETWEEN_POLLING); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (maxPolls > 0 && (status.getState() != AnalysisRequestStatus.State.FINISHED) ); + + Assert.assertEquals(AnalysisRequestStatus.State.FINISHED, am.getAnalysisRequestStatus(requestId).getState()); + tracker = arts.query(requestId); + Assert.assertEquals(STATUS.FINISHED, tracker.getStatus()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java new file mode 100755 index 0000000..0f1aa8f --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/DiscoveryServiceManagerTest.java @@ -0,0 +1,135 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.discoveryservice; + +import java.io.InputStream; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRuntimeStatistics; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceStatus; +import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException; + +public class DiscoveryServiceManagerTest { + + final private static String ASYNCTESTWA_SERVICE_ID = "asynctestservice-with-annotations"; + + final private static String NEW_SERVICE_ID = "New_Service"; + final private static String NEW_SERVICE_NAME = "Name of New Service"; + final private static String NEW_SERVICE_DESCRIPTION = "Description of the New Service"; + final private static String NEW_SERVICE_CLASSNAME = "TestAsyncDiscoveryService1"; + + final private static String UPDATED_SERVICE_DESCRIPTION = "Updated description of the New Service"; + final private static String UPDATED_SERVICE_CLASSNAME = "TestSyncDiscoveryService1"; + + private void registerDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + discoveryServicesManager.createDiscoveryService(dsProperties); + } + + private void replaceDiscoveryService(DiscoveryServiceProperties dsProperties) throws ValidationException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + discoveryServicesManager.replaceDiscoveryService(dsProperties); + } + + private void unregisterDiscoveryService(String serviceId) throws ServiceNotFoundException, ValidationException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + discoveryServicesManager.deleteDiscoveryService(serviceId); + } + + @Test + public void testGetDiscoveryServiceProperties() throws ServiceNotFoundException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + DiscoveryServiceProperties dsProperties = discoveryServicesManager.getDiscoveryServiceProperties(ASYNCTESTWA_SERVICE_ID); + Assert.assertNotNull(dsProperties); + } + + + @Ignore @Test // Ignoring testcase due to problem on Mac (issue #56) + public void testGetDiscoveryServiceStatus() throws ServiceNotFoundException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + DiscoveryServiceStatus dsStatus = discoveryServicesManager.getDiscoveryServiceStatus(ASYNCTESTWA_SERVICE_ID); + Assert.assertNotNull(dsStatus); + } + + @Test // TODO: need to adjust as soon as runtime statistics are available + public void testGetDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + DiscoveryServiceRuntimeStatistics dsRuntimeStats = discoveryServicesManager.getDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID); + Assert.assertNotNull(dsRuntimeStats); + long avgProcTime = dsRuntimeStats.getAverageProcessingTimePerItemInMillis(); + Assert.assertEquals(0, avgProcTime); + } + + @Test + public void testDeleteDiscoveryServiceRuntimeStatistics() throws ServiceNotFoundException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + discoveryServicesManager.deleteDiscoveryServiceRuntimeStatistics(ASYNCTESTWA_SERVICE_ID); + } + + @Test + public void testGetDiscoveryServiceImage() throws ServiceNotFoundException { + DiscoveryServiceManager discoveryServicesManager = new ODFFactory().create().getDiscoveryServiceManager(); + InputStream is = discoveryServicesManager.getDiscoveryServiceImage(ASYNCTESTWA_SERVICE_ID); + Assert.assertNull(is); + } + + @Test + public void testCreateUpdateDelete() throws ServiceNotFoundException, ValidationException, JSONException { + DiscoveryServiceJavaEndpoint dse = new DiscoveryServiceJavaEndpoint(); + dse.setClassName(NEW_SERVICE_CLASSNAME); + DiscoveryServiceProperties dsProperties = new DiscoveryServiceProperties(); + dsProperties.setId(NEW_SERVICE_ID); + dsProperties.setName(NEW_SERVICE_NAME); + dsProperties.setDescription(NEW_SERVICE_DESCRIPTION); + dsProperties.setLink(null); + dsProperties.setPrerequisiteAnnotationTypes(null); + dsProperties.setResultingAnnotationTypes(null); + dsProperties.setSupportedObjectTypes(null); + dsProperties.setAssignedObjectTypes(null); + dsProperties.setAssignedObjectCandidates(null); + dsProperties.setEndpoint(JSONUtils.convert(dse, DiscoveryServiceEndpoint.class)); + dsProperties.setParallelismCount(2); + registerDiscoveryService(dsProperties); + + DiscoveryServiceJavaEndpoint dse2 = new DiscoveryServiceJavaEndpoint(); + dse2.setClassName(UPDATED_SERVICE_CLASSNAME); + DiscoveryServiceProperties dsProperties2 = new DiscoveryServiceProperties(); + dsProperties2.setId(NEW_SERVICE_ID); + dsProperties2.setName(NEW_SERVICE_NAME); + dsProperties2.setDescription(UPDATED_SERVICE_DESCRIPTION); + dsProperties2.setLink(null); + dsProperties.setPrerequisiteAnnotationTypes(null); + dsProperties.setResultingAnnotationTypes(null); + dsProperties.setSupportedObjectTypes(null); + dsProperties.setAssignedObjectTypes(null); + dsProperties.setAssignedObjectCandidates(null); + dsProperties2.setEndpoint(JSONUtils.convert(dse2, DiscoveryServiceEndpoint.class)); + dsProperties2.setParallelismCount(2); + replaceDiscoveryService(dsProperties2); + + unregisterDiscoveryService(NEW_SERVICE_ID); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java new file mode 100755 index 0000000..2ea85b7 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryService1.java @@ -0,0 +1,227 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.discoveryservice; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus; +import org.junit.Assert; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.core.Environment; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse; +import org.apache.atlas.odf.json.JSONUtils; + +public class TestAsyncDiscoveryService1 extends DiscoveryServiceBase implements AsyncDiscoveryService { + + static int unavailableCounter = 0; + + static Logger logger = ODFTestLogger.get(); + + public static void checkUserAndAdditionalProperties(DiscoveryServiceRequest request) { + String user = request.getUser(); + + String defaultUser = System.getProperty("user.name"); + Assert.assertEquals(defaultUser, user); + + Map<String, Object> additionalProperties = request.getAdditionalProperties(); + logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties); + Assert.assertNotNull(additionalProperties); + + // check that environment entries are also available additional properties + Environment ev = new ODFInternalFactory().create(Environment.class); + String dsId = request.getDiscoveryServiceId(); + Map<String, String> serviceEnvProps = ev.getPropertiesWithPrefix(dsId); + if (!serviceEnvProps.isEmpty()) { + Assert.assertTrue(!additionalProperties.isEmpty()); + for (Map.Entry<String, String> serviceEnvProp : serviceEnvProps.entrySet()) { + String key = serviceEnvProp.getKey(); + String val = serviceEnvProp.getValue(); + logger.info("Found discoveryservice configuration parameter: " + key + " with value " + val); + Assert.assertTrue(key.startsWith(dsId)); + Assert.assertTrue(additionalProperties.containsKey(key) ); + Assert.assertEquals(val, additionalProperties.get(key)); + } + } + + if (!additionalProperties.isEmpty()) { + Assert.assertTrue(additionalProperties.containsKey("aaa")); + Assert.assertTrue("bbb".equals(additionalProperties.get("aaa"))); + Assert.assertTrue(additionalProperties.containsKey("jo")); + @SuppressWarnings("unchecked") + Map<String, Object> m = (Map<String, Object>) additionalProperties.get("jo"); + Assert.assertTrue("v1".equals(m.get("p1"))); + Assert.assertTrue("v2".equals(m.get("p2"))); + /* + if (!additionalProperties.containsKey("aaa")) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'aaa' doesn't exist"); + return; + } + if (!"bbb".equals(additionalProperties.get("aaa"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional properties 'aaa' has wrong value"); + return; + } + if (!additionalProperties.containsKey("jo")) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo' doesn't exist"); + return; + } + Map m = (Map) additionalProperties.get("jo"); + if (!"v1".equals(m.get("p1"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo.p1' doesn't exist"); + return; + + } + if (!"v2".equals(m.get("p2"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo.p2' doesn't exist"); + return; + } + */ + } + } + + @Override + public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) { + try { + DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE; + String details = "Cannot answer right now"; + if (unavailableCounter % 2 == 0) { + code = DiscoveryServiceResponse.ResponseCode.OK; + details = "Everything's peachy"; + } + unavailableCounter++; + /* + if (unavailableCounter % 3 == 0) { + code = CODE.NOT_AUTHORIZED; + details = "You have no power here!"; + } + */ + DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse(); + response.setCode(code); + response.setDetails(details); + if (code == DiscoveryServiceResponse.ResponseCode.OK) { + String runid = "TestAsyncService1" + UUID.randomUUID().toString(); + synchronized (lock) { + runIDsRunning.put(runid, 4); // return status "running" 4 times before finishing + } + response.setRunId(runid); + String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId(); + if (dataSetId.startsWith("error")) { + logger.info("TestAsync Discovery Service run " + runid + " will fail"); + runIDsWithError.add(runid); + } else { + logger.info("TestAsync Discovery Service run " + runid + " will succeed"); + } + } + logger.info("TestAsyncDiscoveryService1.startAnalysis returns: " + JSONUtils.lazyJSONSerializer(response)); + checkUserAndAdditionalProperties(request); + /* + String user = request.getUser(); + Assert.assertEquals(TestControlCenter.TEST_USER_ID, user); + + Map<String, Object> additionalProperties = request.getAdditionalProperties(); + logger.info("TestAsyncDiscoveryService1.startAnalysis additional properties: " + additionalProperties); + Assert.assertNotNull(additionalProperties); + if (!additionalProperties.isEmpty()) { + if (!additionalProperties.containsKey("aaa")) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'aaa' doesn't exist"); + return response; + } + if (!"bbb".equals(additionalProperties.get("aaa"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional properties 'aaa' has wrong value"); + return response; + } + if (!additionalProperties.containsKey("jo")) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo' doesn't exist"); + return response; + } + Map m = (Map) additionalProperties.get("jo"); + if (!"v1".equals(m.get("p1"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo.p1' doesn't exist"); + return response; + + } + if (!"v2".equals(m.get("p2"))) { + response.setCode(ResponseCode.UNKNOWN_ERROR); + response.setDetails("Additional property value 'jo.p2' doesn't exist"); + return response; + } + } + */ + return response; + } catch (Throwable t) { + DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse(); + response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + response.setDetails(Utils.getExceptionAsString(t)); + return response; + } + } + + static Object lock = new Object(); + static Map<String, Integer> runIDsRunning = new HashMap<String, Integer>(); + static Set<String> runIDsWithError = Collections.synchronizedSet(new HashSet<String>()); + + // static Map<String, Integer> requestIDUnavailable = new HashMap<>(); + + @Override + public DiscoveryServiceAsyncRunStatus getStatus(String runId) { + String details = "Run like the wind"; + DiscoveryServiceAsyncRunStatus.State state = DiscoveryServiceAsyncRunStatus.State.RUNNING; + synchronized (lock) { + Integer i = runIDsRunning.get(runId); + Assert.assertNotNull(i); + if (i.intValue() == 0) { + if (runIDsWithError.contains(runId)) { + state = DiscoveryServiceAsyncRunStatus.State.ERROR; + details = "This was a mistake"; + } else { + state = DiscoveryServiceAsyncRunStatus.State.FINISHED; + details = "Finish him!"; + } + } else { + runIDsRunning.put(runId, i - 1); + } + } + + DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus(); + status.setRunId(runId); + status.setDetails(details); + status.setState(state); + logger.info("TestAsyncDiscoveryService1.getStatus returns: " + JSONUtils.lazyJSONSerializer(status)); + + return status; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java new file mode 100755 index 0000000..bd2f1a6 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestAsyncDiscoveryServiceWritingAnnotations1.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.discoveryservice; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncRunStatus; +import org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; + +public class TestAsyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements AsyncDiscoveryService { + + static Logger logger = ODFTestLogger.get(); + + static Map<String, MyThread> id2Thread = Collections.synchronizedMap(new HashMap<String, MyThread>()); + + class MyThread extends Thread { + + String errorMessage = null; + String correlationId; + MetaDataObjectReference dataSetRef; + + public MyThread(MetaDataObjectReference dataSetRef, String correlationId) { + super(); + this.dataSetRef = dataSetRef; + this.correlationId = correlationId; + } + + @Override + public void run() { + this.errorMessage = TestSyncDiscoveryServiceWritingAnnotations1.createAnnotations(dataSetRef, correlationId, metadataStore, annotationStore); + } + + } + + @Override + public DiscoveryServiceAsyncStartResponse startAnalysis(DiscoveryServiceRequest request) { + DiscoveryServiceAsyncStartResponse response = new DiscoveryServiceAsyncStartResponse(); + MetaDataObjectReference dataSetRef = request.getDataSetContainer().getDataSet().getReference(); + + String newRunID = "RunId-" + this.getClass().getSimpleName() + "-" + UUID.randomUUID().toString(); + MyThread t = new MyThread(dataSetRef, (String) request.getAdditionalProperties().get(TestSyncDiscoveryServiceWritingAnnotations1.REQUEST_PROPERTY_CORRELATION_ID)); + t.start(); + id2Thread.put(newRunID, t); + response.setCode(DiscoveryServiceResponse.ResponseCode.OK); + response.setRunId(newRunID); + response.setDetails("Thread started"); + logger.info("Analysis writing annotations has started"); + + return response; + } + + @Override + public DiscoveryServiceAsyncRunStatus getStatus(String runId) { + DiscoveryServiceAsyncRunStatus status = new DiscoveryServiceAsyncRunStatus(); + + MyThread t = id2Thread.get(runId); + status.setRunId(runId); + if (t == null) { + status.setState(DiscoveryServiceAsyncRunStatus.State.NOT_FOUND); + } else { + java.lang.Thread.State ts = t.getState(); + if (!ts.equals(java.lang.Thread.State.TERMINATED)) { + status.setState(DiscoveryServiceAsyncRunStatus.State.RUNNING); + } else { + if (t.errorMessage != null) { + status.setState(DiscoveryServiceAsyncRunStatus.State.ERROR); + status.setDetails(t.errorMessage); + } else { + status.setState(DiscoveryServiceAsyncRunStatus.State.FINISHED); + status.setDetails("All went fine"); + } + } + } + logger.info("Status of analysis with annotations: " + status.getState() + ", " + status.getDetails()); + return status; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java new file mode 100755 index 0000000..9ea92f3 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryService1.java @@ -0,0 +1,61 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.discoveryservice; + +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; + +public class TestSyncDiscoveryService1 extends DiscoveryServiceBase implements SyncDiscoveryService { + static int unavailableCounter = 0; + + Logger logger = ODFTestLogger.get(); + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + try { + DiscoveryServiceResponse.ResponseCode code = DiscoveryServiceResponse.ResponseCode.TEMPORARILY_UNAVAILABLE; + String details = "Cannot answer right now synchronously"; + if (unavailableCounter % 2 == 0) { + code = DiscoveryServiceResponse.ResponseCode.OK; + details = "Everything's peachy and synchronous"; + } + unavailableCounter++; + DiscoveryServiceSyncResponse response = new DiscoveryServiceSyncResponse(); + response.setDetails(details); + response.setCode(code); + if (code == DiscoveryServiceResponse.ResponseCode.OK) { + String dataSetId = request.getDataSetContainer().getDataSet().getReference().getId(); + if (dataSetId.startsWith("error")) { + response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + response.setDetails("Something went synchronously wrong!"); + } else { + response.setDetails("All is synchronously fine!"); + } + TestAsyncDiscoveryService1.checkUserAndAdditionalProperties(request); + } + logger.info(this.getClass().getSimpleName() + " service returned with code: " + response.getCode()); + return response; + } catch (Throwable t) { + t.printStackTrace(); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java new file mode 100755 index 0000000..62c7bf6 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/discoveryservice/TestSyncDiscoveryServiceWritingAnnotations1.java @@ -0,0 +1,156 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.discoveryservice; + +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService; +import org.apache.atlas.odf.api.metadata.MetaDataObjectReference; +import org.apache.atlas.odf.api.metadata.MetadataStore; +import org.apache.wink.json4j.JSONObject; +import org.junit.Assert; + +import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore; +import org.apache.atlas.odf.api.metadata.models.DataSet; +import org.apache.atlas.odf.api.metadata.models.MetaDataCache; +import org.apache.atlas.odf.api.metadata.models.MetaDataObject; +import org.apache.atlas.odf.api.metadata.models.RelationalDataSet; +import org.apache.atlas.odf.api.metadata.models.ProfilingAnnotation; +import org.apache.atlas.odf.core.Utils; +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.annotation.AnnotationStore; + +public class TestSyncDiscoveryServiceWritingAnnotations1 extends DiscoveryServiceBase implements SyncDiscoveryService { + + static Logger logger = Logger.getLogger(TestSyncDiscoveryServiceWritingAnnotations1.class.getName()); + + public static String checkMetaDataCache(DiscoveryServiceRequest request) { + logger.info("Checking metadata cache"); + MetaDataObject mdo = request.getDataSetContainer().getDataSet(); + MetaDataCache cache = request.getDataSetContainer().getMetaDataCache(); + if (cache == null) { + return null; + } + CachedMetadataStore cacheReader = new CachedMetadataStore(cache); + + if (mdo instanceof RelationalDataSet) { + logger.info("Checking metadata cache for columns..."); + RelationalDataSet rds = (RelationalDataSet) mdo; + Set<MetaDataObjectReference> cachedColumns = new HashSet<>(); + Set<MetaDataObjectReference> actualColumns = new HashSet<>(); + for (MetaDataObject col : cacheReader.getColumns(rds)) { + cachedColumns.add(col.getReference()); + } + MetadataStore mds = new ODFFactory().create().getMetadataStore(); + for (MetaDataObject col : mds.getColumns(rds)) { + actualColumns.add(col.getReference()); + } + Assert.assertTrue("Columns missing from metadata cache.", cachedColumns.containsAll(actualColumns)); + Assert.assertTrue("Too many columns in metadata cache.", actualColumns.containsAll(cachedColumns)); + } + return null; + } + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + logger.info("Analysis started on sync test service with annotations "); + String errorMessage = createAnnotations( // + request.getDataSetContainer().getDataSet().getReference(), // + (String) request.getAdditionalProperties().get(REQUEST_PROPERTY_CORRELATION_ID), // + metadataStore, // + annotationStore); + if (errorMessage == null) { + errorMessage = checkMetaDataCache(request); + } + DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse(); + if (errorMessage == null) { + resp.setCode(DiscoveryServiceResponse.ResponseCode.OK); + resp.setDetails("Annotations created successfully"); + } else { + resp.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR); + resp.setDetails(errorMessage); + } + logger.info("Analysis finished on sync test service with annotations "); + + return resp; + } + + public static final String REQUEST_PROPERTY_CORRELATION_ID = "REQUEST_PROPERTY_CORRELATION_ID"; + + static final String ANNOTATION_TYPE = "AnnotationType-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName(); + static final String JSON_ATTRIBUTE = "Attribute-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName(); + static final String JSON_VALUE = "Value-" + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName(); + + public static int getNumberOfAnnotations() { + return 3; + } + + public static String[] getPropsOfNthAnnotation(int i) { + return new String[] { ANNOTATION_TYPE + i, JSON_ATTRIBUTE + i, JSON_VALUE + i }; + } + + public static String createAnnotations(MetaDataObjectReference dataSetRef, String correlationId, MetadataStore mds, AnnotationStore as) { + try { + TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Analysis will run on data set ref: " + dataSetRef); + MetaDataObject dataSet = mds.retrieve(dataSetRef); + + String errorMessage = null; + if (dataSet == null) { + errorMessage = "Data set with id " + dataSetRef + " could not be retrieved"; + TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage); + return errorMessage; + } + + if (!(dataSet instanceof DataSet)) { + errorMessage = "Object with id " + dataSetRef + " is not a data set"; + TestSyncDiscoveryServiceWritingAnnotations1.logger.severe(errorMessage); + return errorMessage; + } + + // add some annotations + for (int i = 0; i < getNumberOfAnnotations(); i++) { + String[] annotValues = getPropsOfNthAnnotation(i); + ProfilingAnnotation annotation1 = new ProfilingAnnotation(); + annotation1.setProfiledObject(dataSetRef); + annotation1.setAnnotationType(annotValues[0]); + JSONObject jo1 = new JSONObject(); + jo1.put(annotValues[1], annotValues[2]); + jo1.put(REQUEST_PROPERTY_CORRELATION_ID, correlationId); + annotation1.setJsonProperties(jo1.write()); + +// PG: dynamic type creation disabled (types are already created statically) +// mds.createAnnotationTypesFromPrototypes(Collections.singletonList(annotation1)); + MetaDataObjectReference resultRef1 = as.store(annotation1); + if (resultRef1 == null) { + throw new RuntimeException("Annotation object " + i + " could not be created"); + } + } + + TestSyncDiscoveryServiceWritingAnnotations1.logger.info("Discovery service " + TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + "created annotations successfully"); + } catch (Throwable exc) { + exc.printStackTrace(); + TestSyncDiscoveryServiceWritingAnnotations1.logger.log(Level.WARNING, TestSyncDiscoveryServiceWritingAnnotations1.class.getSimpleName() + " has failed", exc); + return "Failed: " + Utils.getExceptionAsString(exc); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java new file mode 100755 index 0000000..2e6d012 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ODFVersionTest.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.engine; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.engine.ODFVersion; +import org.apache.atlas.odf.core.test.TimerTestBase; + +public class ODFVersionTest extends TimerTestBase { + @Test + public void testVersion() { + ODFVersion version = new ODFFactory().create().getEngineManager().getVersion(); + Assert.assertNotNull(version); + Assert.assertTrue(version.getVersion().startsWith("1.2.0-")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java new file mode 100755 index 0000000..465eb5c --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/engine/ShutdownTest.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.engine; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State; +import org.apache.atlas.odf.api.engine.EngineManager; +import org.apache.atlas.odf.api.engine.ODFEngineOptions; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest; + +public class ShutdownTest extends ODFTestBase { + + private void runAndTestThreads() throws Exception { + ODFAPITest.runRequestAndCheckResult("successID", State.FINISHED, -1); + ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class); + int numThreads = tm.getNumberOfRunningThreads(); + log.info("--- Number of running threads: " + numThreads); + Assert.assertTrue(numThreads >= 3); + } + + @Test + public void testShutdown() throws Exception { + + log.info("--- Running some request before shutdown..."); + runAndTestThreads(); + + ThreadManager tm = new ODFInternalFactory().create(ThreadManager.class); + log.info("--- Number of threads before shutdown: " + tm.getNumberOfRunningThreads()); + + EngineManager engineManager = new ODFFactory().create().getEngineManager(); + ODFEngineOptions options = new ODFEngineOptions(); + options.setRestart(false); + int numThreads = tm.getNumberOfRunningThreads(); + log.info("--- Number of threads before restart: " + numThreads); + + engineManager.shutdown(options); + log.info("--- Shutdown requested..."); + int maxWait = 60; + int waitCnt = 0; + log.info("--- Shutdown requested, waiting for max " + maxWait + " seconds"); + while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) { + waitCnt++; + Thread.sleep(1000); + } + log.info("--- Shutdown should be done by now, waited for " + waitCnt + " threads: " + tm.getNumberOfRunningThreads()); + Assert.assertNotEquals(waitCnt, maxWait); + + // log.info("--- Starting ODF again...."); + + // ODFInitializer.start(); + log.info("--- Rerunning request after shutdown..."); + runAndTestThreads(); + + int nrOfThreads = tm.getNumberOfRunningThreads(); + options.setRestart(true); + engineManager.shutdown(options); + maxWait = nrOfThreads * 2; + waitCnt = 0; + log.info("--- Restart requested..., wait for a maximum of " + (nrOfThreads * 2500) + " ms"); + while (tm.getNumberOfRunningThreads() > 0 && waitCnt < maxWait) { + waitCnt++; + Thread.sleep(1000); + } + log.info("--- Restart should be done by now"); + Thread.sleep(5000); + numThreads = tm.getNumberOfRunningThreads(); + log.info("--- Number of threads after restart: " + numThreads); + Assert.assertTrue(numThreads > 2); + log.info("--- testShutdown finished"); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java new file mode 100755 index 0000000..c2be180 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/messaging/MockQueueManager.java @@ -0,0 +1,249 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.messaging; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager; +import org.apache.wink.json4j.JSONException; + +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.api.engine.MessagingStatus; +import org.apache.atlas.odf.core.controlcenter.AdminMessage; +import org.apache.atlas.odf.core.controlcenter.AdminQueueProcessor; +import org.apache.atlas.odf.core.controlcenter.ConfigChangeQueueProcessor; +import org.apache.atlas.odf.core.controlcenter.DefaultStatusQueueStore; +import org.apache.atlas.odf.core.controlcenter.DiscoveryServiceStarter; +import org.apache.atlas.odf.core.controlcenter.ExecutorServiceFactory; +import org.apache.atlas.odf.core.controlcenter.ODFRunnable; +import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; +import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry; +import org.apache.atlas.odf.core.controlcenter.ThreadManager; +import org.apache.atlas.odf.core.controlcenter.ThreadManager.ThreadStartupResult; +import org.apache.atlas.odf.core.controlcenter.TrackerUtil; +import org.apache.atlas.odf.json.JSONUtils; + +public class MockQueueManager implements DiscoveryServiceQueueManager { + + static Logger logger = Logger.getLogger(MockQueueManager.class.getName()); + + static Object lock = new Object(); + + static List<AdminMessage> adminQueue = Collections.synchronizedList(new ArrayList<AdminMessage>()); + static List<StatusQueueEntry> statusQueue = Collections.synchronizedList(new ArrayList<StatusQueueEntry>()); + static Map<String, List<AnalysisRequestTracker>> runtimeQueues = new HashMap<>(); + + ThreadManager threadManager; + + public MockQueueManager() { + ODFInternalFactory factory = new ODFInternalFactory(); + ExecutorServiceFactory esf = factory.create(ExecutorServiceFactory.class); + threadManager = factory.create(ThreadManager.class); + threadManager.setExecutorService(esf.createExecutorService()); + //initialize(); + } + + @Override + public void start() throws TimeoutException { + logger.info("Initializing MockQueueManager"); + List<ThreadStartupResult> threads = new ArrayList<ThreadStartupResult>(); + ThreadStartupResult startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMIN", createQueueListener("Admin", adminQueue, new AdminQueueProcessor(), false)); + boolean threadCreated = startUnmanagedThread.isNewThreadCreated(); + threads.add(startUnmanagedThread); + startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKADMINCONFIGCHANGE", + createQueueListener("AdminConfig", adminQueue, new ConfigChangeQueueProcessor(), false)); + threadCreated |= startUnmanagedThread.isNewThreadCreated(); + threads.add(startUnmanagedThread); + startUnmanagedThread = this.threadManager.startUnmanagedThread("MOCKSTATUSSTORE", + createQueueListener("StatusStore", statusQueue, new DefaultStatusQueueStore.StatusQueueProcessor(), true)); + threadCreated |= startUnmanagedThread + .isNewThreadCreated(); + threads.add(startUnmanagedThread); + + logger.info("New thread created: " + threadCreated); + if (threadCreated) { + try { + this.threadManager.waitForThreadsToBeReady(5000, threads); + logger.info("All threads ready"); + } catch (TimeoutException e) { + final String message = "Not all thrads were created on time"; + logger.warning(message); + } + } + } + + @Override + public void stop() { + threadManager.shutdownThreads(Arrays.asList("MOCKADMIN", "MOCKADMINCONFIGCHANGE", "MOCKSTATUSSTORE")); + } + + <T> T cloneObject(T obj) { + try { + return JSONUtils.cloneJSONObject(obj); + } catch (JSONException e) { + throw new RuntimeException(e); + } + } + + @Override + public void enqueue(AnalysisRequestTracker tracker) { + tracker = cloneObject(tracker); + DiscoveryServiceRequest dsRequest = TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker); + if (dsRequest == null) { + throw new RuntimeException("Tracker is finished, should not be enqueued"); + } + String dsID = dsRequest.getDiscoveryServiceId(); + dsRequest.setPutOnRequestQueue(System.currentTimeMillis()); + synchronized (lock) { + ServiceRuntime runtime = ServiceRuntimes.getRuntimeForDiscoveryService(dsID); + if (runtime == null) { + throw new RuntimeException(MessageFormat.format("Runtime of discovery service ''{0}'' does not exist", dsID)); + } + String runtimeName = runtime.getName(); + List<AnalysisRequestTracker> mq = runtimeQueues.get(runtimeName); + if (mq == null) { + mq = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>()); + runtimeQueues.put(runtimeName, mq); + } + boolean started = this.threadManager.startUnmanagedThread("MOCK" + runtimeName, createQueueListener("Starter" + runtimeName, mq, new DiscoveryServiceStarter(), false)) + .isNewThreadCreated(); + logger.info("New thread created for runtime " + runtimeName + ", started: " + started + ", current queue length: " + mq.size()); + mq.add(tracker); + } + } + + static class MockQueueListener implements ODFRunnable { + String name; + QueueMessageProcessor processor; + List<?> queue; + boolean cancelled = false; + ExecutorService service; + int index = 0; + + public MockQueueListener(String name, List<?> q, QueueMessageProcessor qmp, boolean fromBeginning) { + this.name = name; + this.processor = qmp; + this.queue = q; + if (fromBeginning) { + index = 0; + } else { + index = q.size(); + } + } + + long WAITTIMEMS = 100; + + boolean isValidIndex() { + return index >= 0 && index < queue.size(); + } + + @Override + public void run() { + logger.info("MockQueueManager thread " + name + " started"); + + while (!cancelled) { + // logger.info("Queue consumer " + name + ": checking index " + index + " on queue of size " + queue.size()); + if (!isValidIndex()) { + try { + Thread.sleep(WAITTIMEMS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + Object obj = queue.get(index); + String msg; + try { + msg = JSONUtils.toJSON(obj); + } catch (JSONException e) { + e.printStackTrace(); + cancelled = true; + return; + } + this.processor.process(service, msg, 0, index); + logger.finest("MockQConsumer " + name + ": Processed message: " + msg); + index++; + } + } + logger.info("MockQueueManager thread finished"); + + } + + + @Override + public void setExecutorService(ExecutorService service) { + this.service = service; + } + + @Override + public void cancel() { + cancelled = true; + } + + @Override + public boolean isReady() { + return true; + } + + } + + ODFRunnable createQueueListener(String name, List<?> queue, QueueMessageProcessor qmp, boolean fromBeginning) { + return new MockQueueListener(name, queue, qmp, fromBeginning); + } + + @Override + public void enqueueInStatusQueue(StatusQueueEntry sqe) { + sqe = cloneObject(sqe); + statusQueue.add(sqe); + } + + @Override + public void enqueueInAdminQueue(AdminMessage message) { + message = cloneObject(message); + adminQueue.add(message); + } + + public static class MockMessagingStatus extends MessagingStatus { + String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + } + + @Override + public MessagingStatus getMessagingStatus() { + MockMessagingStatus mms = new MockMessagingStatus(); + mms.setMessage("OK"); + return mms; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java new file mode 100755 index 0000000..f69513c --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/NotificationManagerTest.java @@ -0,0 +1,72 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.notification; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.core.ODFInternalFactory; +import org.apache.atlas.odf.core.notification.NotificationListener; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.notification.NotificationManager; +import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest; + +public class NotificationManagerTest extends ODFTestBase { + + @Test + public void testNotifications() throws Exception { + NotificationManager nm = new ODFInternalFactory().create(NotificationManager.class); + Assert.assertNotNull(nm); + log.info("Notification manager found " + nm.getClass().getName()); + Assert.assertTrue(nm instanceof TestNotificationManager); + List<NotificationListener> listeners = nm.getListeners(); + Assert.assertTrue(listeners.size() > 0); + + OpenDiscoveryFramework odf = new ODFFactory().create(); + List<String> dataSetIDs = Collections.singletonList("successID"); + String id = ODFAPITest.runRequest(dataSetIDs, odf.getAnalysisManager()); + ODFAPITest.waitForRequest(id, odf.getAnalysisManager()); + + int polls = 20; + boolean found = false; + boolean foundFinished = false; + do { + // now check that trackers were found through the notification mechanism + log.info("Checking that trackers were consumed, " + polls + " seconds left"); + List<AnalysisRequestTracker> trackers = new ArrayList<>(TestNotificationManager.receivedTrackers); + log.info("Received trackers: " + trackers.size()); + for (AnalysisRequestTracker tracker : trackers) { + String foundId = tracker.getRequest().getId(); + if (foundId.equals(id)) { + found = true; + if (tracker.getStatus().equals(STATUS.FINISHED)) { + foundFinished = true; + } + } + } + polls--; + Thread.sleep(1000); + } while (!found && !foundFinished && polls > 0); + Assert.assertTrue(found); + Assert.assertTrue(foundFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java new file mode 100755 index 0000000..80252d6 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/notification/TestNotificationManager.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.notification; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker; +import org.apache.atlas.odf.core.controlcenter.StatusQueueEntry; +import org.apache.atlas.odf.core.notification.NotificationListener; +import org.apache.atlas.odf.core.notification.NotificationManager; +import org.apache.atlas.odf.json.JSONUtils; +import org.apache.wink.json4j.JSONException; + +public class TestNotificationManager implements NotificationManager { + + public static class TestListener1 implements NotificationListener { + + @Override + public String getTopicName() { + return "odf-status-topic"; + } + + @Override + public void onEvent(String event, OpenDiscoveryFramework odf) { + try { + StatusQueueEntry sqe = JSONUtils.fromJSON(event, StatusQueueEntry.class); + AnalysisRequestTracker tracker = sqe.getAnalysisRequestTracker(); + if (tracker != null) { + receivedTrackers.add(tracker); + } + } catch (JSONException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getName() { + return this.getClass().getName(); + } + + } + + public static List<AnalysisRequestTracker> receivedTrackers = Collections.synchronizedList(new ArrayList<AnalysisRequestTracker>()); + + @Override + public List<NotificationListener> getListeners() { + List<NotificationListener> result = new ArrayList<>(); + result.add(new TestListener1()); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java new file mode 100755 index 0000000..8a8d9a8 --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/RuntimeExtensionTest.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.runtime; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.atlas.odf.api.ODFFactory; +import org.apache.atlas.odf.api.OpenDiscoveryFramework; +import org.apache.atlas.odf.api.analysis.AnalysisRequest; +import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus.State; +import org.apache.atlas.odf.api.analysis.AnalysisResponse; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntimes; +import org.apache.atlas.odf.core.test.ODFTestBase; +import org.apache.atlas.odf.core.test.controlcenter.ODFAPITest; + +public class RuntimeExtensionTest extends ODFTestBase { + + static final String SERVICE_ON_TEST_RUNTIME = "testruntimeservice"; + + List<String> getNames(List<ServiceRuntime> rts) { + List<String> result = new ArrayList<>(); + for (ServiceRuntime rt : rts) { + result.add(rt.getName()); + } + return result; + } + + @Test + public void testActiveRuntimes() { + List<String> allNames = getNames(ServiceRuntimes.getAllRuntimes()); + Assert.assertTrue(allNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME)); + + List<String> activeNames = getNames(ServiceRuntimes.getActiveRuntimes()); + Assert.assertTrue(activeNames.contains(TestServiceRuntime.TESTSERVICERUNTIME_NAME)); + } + + @Test + public void testRuntimeForNewService() { + ServiceRuntime rt = ServiceRuntimes.getRuntimeForDiscoveryService(SERVICE_ON_TEST_RUNTIME); + Assert.assertNotNull(rt); + Assert.assertEquals(TestServiceRuntime.TESTSERVICERUNTIME_NAME, rt.getName()); + } + + static Object lock = new Object(); + + @Test + public void testRuntimeExtensionSimple() throws Exception { + synchronized (lock) { + OpenDiscoveryFramework odf = new ODFFactory().create(); + TestServiceRuntime.runtimeBlocked = false; + AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID)); + request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME)); + log.info("Starting service for test runtime"); + AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request); + String requestId = resp.getId(); + Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED)); + Assert.assertTrue(TestServiceRuntime.requests.contains(requestId)); + log.info("testRuntimeExtensionSimple finished"); + + // block runtime again to restore state before testcase + TestServiceRuntime.runtimeBlocked = true; + Thread.sleep(5000); + } + } + + @Test + public void testBlockedRuntimeExtension() throws Exception { + synchronized (lock) { + OpenDiscoveryFramework odf = new ODFFactory().create(); + TestServiceRuntime.runtimeBlocked = true; + AnalysisRequest request = ODFAPITest.createAnalysisRequest(Collections.singletonList(ODFAPITest.DUMMY_SUCCESS_ID)); + request.setDiscoveryServiceSequence(Collections.singletonList(SERVICE_ON_TEST_RUNTIME)); + log.info("Starting service for test runtime"); + AnalysisResponse resp = odf.getAnalysisManager().runAnalysis(request); + String requestId = resp.getId(); + Assert.assertFalse(resp.isInvalidRequest()); + log.info("Checking that service is not called"); + for (int i = 0; i < 5; i++) { + Assert.assertFalse(TestServiceRuntime.requests.contains(requestId)); + Thread.sleep(1000); + } + log.info("Unblocking runtime..."); + TestServiceRuntime.runtimeBlocked = false; + Thread.sleep(5000); // give service time to start consumption + log.info("Checking that request has finished"); + Assert.assertTrue(ODFAPITest.waitForRequest(requestId, odf.getAnalysisManager(), 40, State.FINISHED)); + log.info("Checking that service was called"); + Assert.assertTrue(TestServiceRuntime.requests.contains(requestId)); + log.info("testBlockedRuntimeExtension finished"); + + // block runtime again to restore state before testcase + TestServiceRuntime.runtimeBlocked = true; + Thread.sleep(5000); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java ---------------------------------------------------------------------- diff --git a/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java new file mode 100755 index 0000000..d16e10a --- /dev/null +++ b/odf/odf-core/src/test/java/org/apache/atlas/odf/core/test/runtime/TestServiceRuntime.java @@ -0,0 +1,80 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.odf.core.test.runtime; + +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; + +import org.apache.atlas.odf.api.discoveryservice.DiscoveryService; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest; +import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase; +import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse; +import org.apache.atlas.odf.api.settings.validation.ValidationException; +import org.apache.atlas.odf.core.controlcenter.ServiceRuntime; +import org.apache.atlas.odf.core.test.ODFTestLogger; +import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse; + +public class TestServiceRuntime implements ServiceRuntime { + + static Logger logger = ODFTestLogger.get(); + + public static final String TESTSERVICERUNTIME_NAME = "TestServiceRuntime"; + + public static boolean runtimeBlocked = true; + + @Override + public String getName() { + return TESTSERVICERUNTIME_NAME; + } + + @Override + public long getWaitTimeUntilAvailable() { + if (runtimeBlocked) { + return 1000; + } + return 0; + } + + public static Set<String> requests = new HashSet<>(); + + public static class DSProxy extends SyncDiscoveryServiceBase { + + @Override + public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) { + logger.info("Running test runtime service"); + requests.add(request.getOdfRequestId()); + DiscoveryServiceSyncResponse resp = new DiscoveryServiceSyncResponse(); + resp.setCode(DiscoveryServiceResponse.ResponseCode.OK); + resp.setDetails("Test success"); + return resp; + } + } + + @Override + public DiscoveryService createDiscoveryServiceProxy(DiscoveryServiceProperties props) { + return new DSProxy(); + } + + @Override + public String getDescription() { + return "TestServiceRuntime description"; + } + + @Override + public void validate(DiscoveryServiceProperties props) throws ValidationException { + } + +}
