http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
new file mode 100755
index 0000000..4ffa195
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ControlCenter.java
@@ -0,0 +1,454 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.OpenDiscoveryFramework;
+import org.apache.atlas.odf.api.analysis.*;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import 
org.apache.atlas.odf.api.discoveryservice.async.DiscoveryServiceAsyncStartResponse;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.AnnotationPropagator;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.api.metadata.models.UnknownDataSet;
+import org.apache.atlas.odf.core.Encryption;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisCancelResult;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestStatus;
+import org.apache.atlas.odf.api.analysis.AnalysisResponse;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.ServiceNotFoundException;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Utils;
+
+public class ControlCenter {
+
+       private static final String CLASSNAME = ControlCenter.class.getName();
+       private Logger logger = Logger.getLogger(ControlCenter.class.getName());
+
+       public static final String HEALTH_TEST_DISCOVERY_SERVICE_ID = 
"odf-health-test-discovery-service-id";
+       public static final String HEALTH_TEST_DATA_SET_ID_PREFIX = 
"odf-health-test-dummy-data-set-id";
+
+       DiscoveryServiceQueueManager queueManager = null;
+       AnalysisRequestTrackerStore store = null;
+       Environment environment = null;
+       OpenDiscoveryFramework odf;
+
+       public ControlCenter() {
+               ODFInternalFactory f = new ODFInternalFactory();
+               queueManager = f.create(DiscoveryServiceQueueManager.class);
+               store = f.create(AnalysisRequestTrackerStore.class);
+               odf = new ODFFactory().create();
+               environment = f.create(Environment.class);
+       }
+
+       private String createNewRequestId() {
+               return "odf-request-" + UUID.randomUUID().toString() + "_" + 
System.currentTimeMillis();
+       }
+
+       public DiscoveryServiceQueueManager getQueueManager() {
+               return queueManager;
+       }
+
+       public AnalysisResponse startRequest(AnalysisRequest request) {
+               final String METHODNAME = "startRequest()";
+               logger.entering(CLASSNAME, METHODNAME);
+               AnalysisResponse response = new AnalysisResponse();
+               AnalysisRequest requestWithServiceSequence = null;
+               try {
+                       requestWithServiceSequence = 
JSONUtils.fromJSON(JSONUtils.toJSON(request), AnalysisRequest.class);
+               } catch (JSONException e) {
+                       throw new RuntimeException("Error cloning analysis 
request.");
+               }
+               if ((request.getDiscoveryServiceSequence() == null) || 
request.getDiscoveryServiceSequence().isEmpty()) {
+                       DeclarativeRequestMapper mapper = new 
DeclarativeRequestMapper(request);
+                       List<String> discoveryServiceSequence = 
mapper.getRecommendedDiscoveryServiceSequence();
+                       logger.log(Level.INFO, "Using discovery service 
sequence: " + Utils.joinStrings(discoveryServiceSequence, ','));
+                       if (discoveryServiceSequence == null) {
+                               response.setId(request.getId());
+                               response.setInvalidRequest(true);
+                               response.setDetails("No suitable discovery 
services found to create the requested annotation types.");
+                               return response;
+                       }
+                       
requestWithServiceSequence.setDiscoveryServiceSequence(discoveryServiceSequence);
+               }
+               try {
+                       //Initialize queues to make sure analysis can be started
+                       queueManager.start();
+               } catch (TimeoutException e) {
+                       logger.warning("queues could not be started in time");
+               }
+               AnalysisRequestTracker similarTracker = 
store.findSimilarQueuedRequest(requestWithServiceSequence);
+               if (similarTracker != null) {
+                       logger.log(Level.WARNING, "A similar request for the 
issued one is already in the queue.");
+                       logger.log(Level.FINE, "A similar request for the 
issued one is already in the queue. Original request: {0}, found similar 
request: {1}",
+                                       new Object[] { 
JSONUtils.lazyJSONSerializer(requestWithServiceSequence),
+                                       
JSONUtils.lazyJSONSerializer(similarTracker) });
+               }
+               String newRequestId = createNewRequestId();
+               response.setId(newRequestId);
+               requestWithServiceSequence.setId(newRequestId);
+               AnalysisRequestTracker tracker = 
createTracker(requestWithServiceSequence, response);
+               // if request is invalid, response was already modified and 
null is returned
+               if (tracker != null) {
+                       
tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+                       logger.log(Level.FINE, "Starting new request with ID 
''{0}''. Tracker: {1}", new Object[] { newRequestId, 
JSONUtils.lazyJSONSerializer(tracker) });
+                       store.store(tracker);
+                       logger.log(Level.FINEST, "Stored tracker for new 
request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, 
JSONUtils.lazyJSONSerializer(tracker) });
+                       queueManager.enqueue(tracker);
+                       logger.log(Level.FINEST, "Tracker enqueued for new 
request with ID ''{0}''. Tracker: {1}", new Object[] { newRequestId, 
JSONUtils.lazyJSONSerializer(tracker) });
+               }
+               logger.exiting(CLASSNAME, METHODNAME);
+               return response;
+       }
+
+       public AnalysisRequestStatus getRequestStatus(String requestId) {
+               final String METHODNAME = "getRequestStatus(String)";
+               logger.entering(CLASSNAME, METHODNAME);
+               AnalysisRequestStatus result = new AnalysisRequestStatus();
+               AnalysisRequestTracker tracker = store.query(requestId);
+               if (tracker == null) {
+                       result.setState(AnalysisRequestStatus.State.NOT_FOUND);
+               } else {
+                       AnalysisRequestStatus.State state = null;
+                       switch (tracker.getStatus()) {
+                       case INITIALIZED:
+                       case IN_DISCOVERY_SERVICE_QUEUE:
+                               state = AnalysisRequestStatus.State.QUEUED;
+                               break;
+                       case ERROR:
+                               state = AnalysisRequestStatus.State.ERROR;
+                               break;
+                       case DISCOVERY_SERVICE_RUNNING:
+                               state = AnalysisRequestStatus.State.ACTIVE;
+                               break;
+                       case FINISHED:
+                               state = AnalysisRequestStatus.State.FINISHED;
+                               break;
+                       case CANCELLED:
+                               state = AnalysisRequestStatus.State.CANCELLED;
+                       default:
+                               ;
+                       }
+                       result.setState(state);
+                       result.setDetails(tracker.getStatusDetails());
+                       result.setRequest(tracker.getRequest());
+
+                       long totalProcessingTime = 0;
+                       long totalQueuingTime = 0;
+                       long totalTimeSpentStoringAnnotations = 0;
+
+                       List<DiscoveryServiceRequest> requests = new 
ArrayList<DiscoveryServiceRequest>();
+                       for (DiscoveryServiceRequest req : 
tracker.getDiscoveryServiceRequests()) {
+                               DiscoveryServiceRequest copyReq = new 
DiscoveryServiceRequest();
+                               
copyReq.setDiscoveryServiceId(req.getDiscoveryServiceId());
+                               long putOnQueue = req.getPutOnRequestQueue();
+                               long startedProcessing = 
req.getTakenFromRequestQueue();
+                               long finishedProcessing = 
req.getFinishedProcessing();
+
+                               totalProcessingTime += (finishedProcessing > 0 
? finishedProcessing - startedProcessing : finishedProcessing);
+                               totalQueuingTime += (startedProcessing > 0 ? 
startedProcessing - putOnQueue : startedProcessing);
+                               totalTimeSpentStoringAnnotations += 
req.getTimeSpentStoringResults();
+
+                               
copyReq.setFinishedProcessing(finishedProcessing);
+                               copyReq.setPutOnRequestQueue(putOnQueue);
+                               
copyReq.setTakenFromRequestQueue(startedProcessing);
+                               requests.add(copyReq);
+                       }
+
+                       result.setTotalTimeOnQueues(totalQueuingTime);
+                       result.setTotalTimeProcessing(totalProcessingTime);
+                       
result.setTotalTimeStoringAnnotations(totalTimeSpentStoringAnnotations);
+                       result.setServiceRequests(requests);
+               }
+               logger.log(Level.FINE, "Returning request status object {0}", 
JSONUtils.lazyJSONSerializer(result));
+               logger.exiting(CLASSNAME, METHODNAME);
+               return result;
+       }
+
+       public AnalysisCancelResult cancelRequest(String requestId) {
+               final String METHODNAME = "cancelRequest(String)";
+               logger.entering(CLASSNAME, METHODNAME);
+
+               AnalysisCancelResult result = new AnalysisCancelResult();
+               result.setState(AnalysisCancelResult.State.NOT_FOUND);
+
+               AnalysisRequestTracker request = store.query(requestId);
+               //TODO implement cancellation of running instead of only queued 
requests.
+               if (request != null) {
+                       if (TrackerUtil.isCancellable(request)) {
+                               
request.setStatus(AnalysisRequestTrackerStatus.STATUS.CANCELLED);
+                               store.store(request);
+                               logger.info("cancelled request with id " + 
requestId);
+                               
result.setState(AnalysisCancelResult.State.SUCCESS);
+                       } else {
+                               logger.log(Level.FINER, "Request ''{0}'' could 
not be cancelled. State ''{1}'', next request number:. ''{2}''", new 
Object[]{requestId, request.getStatus(), 
request.getNextDiscoveryServiceRequest()});
+                               
result.setState(AnalysisCancelResult.State.INVALID_STATE);
+                       }
+               }
+               logger.exiting(CLASSNAME, METHODNAME);
+               return result;
+       }
+
+       private AnalysisRequestTracker createTracker(AnalysisRequest request, 
AnalysisResponse response) {
+               DiscoveryServiceManager discoveryServiceManager = 
odf.getDiscoveryServiceManager();
+               List<DiscoveryServiceProperties> registeredServices = new 
ArrayList<>(discoveryServiceManager.getDiscoveryServicesProperties());
+               
registeredServices.add(HealthCheckServiceRuntime.getHealthCheckServiceProperties());
+               String currentUser = this.environment.getCurrentUser();
+
+               /*
+               List<MetaDataObjectReference> datasets = request.getDataSets();
+               
+               if (datasets.size() == 1 && 
datasets.get(0).getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) {
+                       // health test mode
+                       AnalysisRequestTracker healthTestTracker = new 
AnalysisRequestTracker();
+                       DiscoveryServiceRequest dssr = new 
DiscoveryServiceRequest();
+                       dssr.setOdfRequestId(request.getId());
+                       
dssr.setDiscoveryServiceId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+                       String odfUrl = new 
ODFFactory().create().getSettingsManager().getODFSettings().getOdfUrl();
+                       dssr.setOdfUrl(odfUrl);
+                       MetaDataObjectReference dsr = datasets.get(0);
+                       
+                       DataSetContainer dataSetContainer = new 
DataSetContainer();
+                       DataSet oMDataSet = new UnknownDataSet();       
+                       oMDataSet.setReference(dsr);
+                       dataSetContainer.setDataSet(oMDataSet);
+                       
+                       dssr.setDataSetContainer(dataSetContainer);
+                       dssr.setUser(currentUser);
+                       
dssr.setAdditionalProperties(request.getAdditionalProperties());
+                       
healthTestTracker.setDiscoveryServiceRequests(Collections.singletonList(dssr));
+                       healthTestTracker.setRequest(request);
+                       healthTestTracker.setStatus(STATUS.INITIALIZED);
+                       Utils.setCurrentTimeAsLastModified(healthTestTracker);
+                       healthTestTracker.setUser(currentUser);
+                       response.setDetails("Request is a special health test 
request.");
+                       return healthTestTracker;
+               }
+               */
+
+               List<DiscoveryServiceRequest> startRequests = new 
ArrayList<DiscoveryServiceRequest>();
+               List<String> discoveryServiceSequence = 
request.getDiscoveryServiceSequence();
+               if (discoveryServiceSequence != null && 
!discoveryServiceSequence.isEmpty()) {
+                       logger.log(Level.FINE, "Request issued with fixed 
discovery service sequence: {0}", discoveryServiceSequence);
+                       // first check if discoveryService IDs are valid
+                       Set<String> foundDSs = new 
HashSet<String>(discoveryServiceSequence);
+                       for (String ds : discoveryServiceSequence) {
+                               for (DiscoveryServiceProperties regInfo : 
registeredServices) {
+                                       if (regInfo.getId().equals(ds)) {
+                                               foundDSs.remove(ds);
+                                       }
+                               }
+                       }
+                       // if there are some IDs left that were not found 
+                       if (!foundDSs.isEmpty()) {
+                               String msg = MessageFormat.format("The 
discovery services {0} could not be found", Utils.collectionToString(foundDSs, 
","));
+                               logger.log(Level.WARNING, msg);
+                               response.setInvalidRequest(true);
+                               response.setDetails(msg);
+                               return null;
+                       }
+
+                       // for each data set process all discovery services
+                       // (possible alternative, not used here: for all 
discovery services process each data set)
+                       for (MetaDataObjectReference dataSetId : 
request.getDataSets()) {
+                               MetaDataObject mdo = null;
+                               if 
(dataSetId.getId().startsWith(HEALTH_TEST_DATA_SET_ID_PREFIX)) {
+                                       mdo = new UnknownDataSet();
+                                       mdo.setReference(dataSetId);
+                               } else {
+                                       mdo = 
odf.getMetadataStore().retrieve(dataSetId);
+                               }
+                               if (mdo == null) {
+                                       String msg = MessageFormat.format("The 
meta data object id ''{0}'' does not reference an existing metadata object. 
Request will be set to error.", dataSetId.toString());
+                                       logger.log(Level.WARNING, msg);
+                                       response.setInvalidRequest(true);
+                                       response.setDetails(msg);
+                                       return null;
+                               }
+                               if (dataSetId.getUrl() == null) {
+                                       
dataSetId.setUrl(mdo.getReference().getUrl());
+                               }
+                               for (String ds : discoveryServiceSequence) {
+                                       DiscoveryServiceRequest req = new 
DiscoveryServiceRequest();
+                                       DataSetContainer dataSetContainer = new 
DataSetContainer();
+                                       dataSetContainer.setDataSet(mdo);
+                                       
req.setDataSetContainer(dataSetContainer);
+                                       req.setOdfRequestId(request.getId());
+                                       req.setDiscoveryServiceId(ds);
+                                       req.setUser(currentUser);
+                                       
req.setAdditionalProperties(request.getAdditionalProperties());
+                                       String odfUrl = 
odf.getSettingsManager().getODFSettings().getOdfUrl();
+                                       req.setOdfUrl(odfUrl);
+                                       for (DiscoveryServiceProperties dsri : 
odf.getDiscoveryServiceManager().getDiscoveryServicesProperties()) {
+                                               if (dsri.getId().equals(ds)) {
+                                                       if 
(dsri.getEndpoint().getRuntimeName().equals(SparkServiceRuntime.SPARK_RUNTIME_NAME))
 {
+                                                               
req.setOdfUser(odf.getSettingsManager().getODFSettings().getOdfUser());
+                                                               //Note that the 
password has to be provided as plain text here because the remote service 
cannot decrypt it otherwise.
+                                                               //TODO: 
Consider to provide a temporary secure token instead of the password.
+                                                               
req.setOdfPassword(Encryption.decryptText(odf.getSettingsManager().getODFSettings().getOdfPassword()));
+                                                       }
+                                               }
+                                       }
+                                       startRequests.add(req);
+                               }
+                       }
+               } else {
+                       String msg = "The request didn't contain any processing 
hints. ODF cannot process a request without an analysis sequence.";
+                       logger.log(Level.WARNING, msg);
+                       response.setInvalidRequest(true);
+                       response.setDetails(msg);
+                       return null;
+               }
+
+               AnalysisRequestTracker tracker = new AnalysisRequestTracker();
+               tracker.setDiscoveryServiceRequests(startRequests);
+               tracker.setNextDiscoveryServiceRequest(0);
+               tracker.setRequest(request);
+               
tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.INITIALIZED);
+               Utils.setCurrentTimeAsLastModified(tracker);
+               tracker.setUser(currentUser);
+               return tracker;
+       }
+       
+       boolean requiresMetaDataCache(DiscoveryService service) {
+               return service instanceof SparkDiscoveryServiceProxy;
+       }
+
+       public static SyncDiscoveryService getDiscoveryServiceProxy(String 
discoveryServiceId, AnalysisRequest request) {
+               try {
+                       ODFInternalFactory factory = new ODFInternalFactory();
+                       DiscoveryServiceManager dsm = 
factory.create(DiscoveryServiceManager.class);
+                       DiscoveryServiceProperties serviceProps = null;
+                       if 
(discoveryServiceId.startsWith(HEALTH_TEST_DISCOVERY_SERVICE_ID)) {
+                               serviceProps = 
HealthCheckServiceRuntime.getHealthCheckServiceProperties();
+                       } else {
+                               serviceProps = 
dsm.getDiscoveryServiceProperties(discoveryServiceId);
+                       }
+                       ServiceRuntime runtime = 
ServiceRuntimes.getRuntimeForDiscoveryService(discoveryServiceId);
+                       if (runtime == null) {
+                               throw new 
RuntimeException(MessageFormat.format("Service runtime for service ''{0}'' was 
not found.", discoveryServiceId));
+                       }
+                       DiscoveryService runtimeProxy = 
runtime.createDiscoveryServiceProxy(serviceProps);
+                       SyncDiscoveryService proxy = null;
+                       if (runtimeProxy instanceof AsyncDiscoveryService) {
+                               proxy = new AsyncDiscoveryServiceWrapper( 
(AsyncDiscoveryService) runtimeProxy);
+                       } else {
+                               proxy = (SyncDiscoveryService) runtimeProxy;
+                       }
+                       
proxy.setMetadataStore(factory.create(MetadataStore.class));
+                       AnnotationStore as = 
factory.create(AnnotationStore.class);
+                       if (request != null) {
+                               as.setAnalysisRun(request.getId());
+                       }
+                       proxy.setAnnotationStore(as);
+                       return proxy;
+               } catch (ServiceNotFoundException exc) {
+                       throw new RuntimeException(exc);
+               }
+       }
+
+       /**
+        * package private helper method that can be called when the current 
discovery service was finished
+        * and you want to advance to the next.
+        * NOTE: This should only be called once for all nodes, i.e., typically 
from a Kafka consumer
+        *       that has runs on all nodes with the same consumer group ID.
+        * 
+        * @param dsRunID runID is just used for logging, could be any value
+        * @param dsID
+        */
+       void advanceToNextDiscoveryService(final AnalysisRequestTracker 
tracker) {
+               DiscoveryServiceRequest req = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+               DiscoveryServiceResponse resp = 
TrackerUtil.getCurrentDiscoveryServiceStartResponse(tracker);
+               String dsRunID = "N/A";
+               if (resp instanceof DiscoveryServiceAsyncStartResponse) {
+                       dsRunID = ((DiscoveryServiceAsyncStartResponse) 
resp).getRunId();
+               }
+               String dsID = req.getDiscoveryServiceId();
+
+               TrackerUtil.moveToNextDiscoveryService(tracker);
+               DiscoveryServiceRequest nextDSReq = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+               if (nextDSReq == null) {
+                       logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS 
''{1}'' was last of request ''{2}'', marking overall request as finished",
+                                       new Object[] { dsRunID, dsID, 
tracker.getRequest().getId() });
+                       // overall request is finished
+                       
tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.FINISHED);
+                       tracker.setStatusDetails("All discovery services ran 
successfully");
+                       
+                       // now propagate annotations if configured
+                       logger.log(Level.FINE, "Request is finished, checking 
for annotation propagation");
+                       Boolean doPropagation = 
odf.getSettingsManager().getODFSettings().getEnableAnnotationPropagation();
+                       if (Boolean.TRUE.equals(doPropagation)) {
+                               TransactionContextExecutor 
transactionContextExecutor = new 
ODFInternalFactory().create(TransactionContextExecutor.class);
+                               try {
+                                       
transactionContextExecutor.runInTransactionContext(new Callable<Object>() {
+                                               
+                                               @Override
+                                               public Object call() throws 
Exception {
+                                                       AnnotationPropagator ap 
= odf.getMetadataStore().getAnnotationPropagator();
+                                                       if (ap != null) {
+                                                               
logger.log(Level.FINE, "Annotation Propagator exists, running propagation");
+                                                               try {
+                                                                       
ap.propagateAnnotations(new ODFFactory().create().getAnnotationStore(), 
tracker.getRequest().getId());
+                                                               } 
catch(Exception exc) {
+                                                                       
logger.log(Level.SEVERE, "An unexcepted exception occurred while propagating 
annotations", exc);
+                                                                       
tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.ERROR);
+                                                                       String 
msg = MessageFormat.format("An unexpected exception occured while propagating 
annotations: ''{0}''", Utils.getExceptionAsString(exc));
+                                                                       
tracker.setStatusDetails(msg);
+                                                               }
+                                                       }
+                                                       return null;
+                                               }
+                                       });
+                               } catch (Exception e) {
+                                       // should never happen as exception is 
handled inside the callable
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               } else {
+                       logger.log(Level.FINER, "DSWatcher: Run ''{0}'' of DS 
''{1}'' was not the last of request ''{2}'', moving over to next request",
+                                       new Object[] { dsRunID, dsID, 
tracker.getRequest().getId() });
+                       
tracker.setStatus(AnalysisRequestTrackerStatus.STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+                       queueManager.enqueue(tracker);
+               }
+               Utils.setCurrentTimeAsLastModified(tracker);
+               store.store(tracker);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
new file mode 100755
index 0000000..9b16270
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DeclarativeRequestMapper.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.text.MessageFormat;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceManager;
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+
+/**
+*
+* Maps a list of {@link AnnotationType} objects to a list of service ids 
representing concrete discovery
+* services that generate the requested annotation types.
+* 
+* Internally, this class generates a list of all possible combinations of 
discovery services which may be
+* used to generate the requested annotation types. The combinations are then 
assessed and ordered by the
+* expected execution effort and the one with the least execution effort is 
provided. 
+*
+*/
+public class DeclarativeRequestMapper {
+
+       private Logger logger = 
Logger.getLogger(DeclarativeRequestMapper.class.getName());
+
+       DiscoveryServiceManager dsManager = new 
ODFFactory().create().getDiscoveryServiceManager();
+       List<DiscoveryServiceProperties> dsPropList = 
dsManager.getDiscoveryServicesProperties();
+
+       private List<DiscoveryServiceSequence> discoveryServiceSequences = new 
ArrayList<DiscoveryServiceSequence>();
+
+       public DeclarativeRequestMapper(AnalysisRequest request) {
+               String messageText = "Generating possible discovery service 
sequences for annotation types {0}.";
+               logger.log(Level.INFO, MessageFormat.format(messageText, 
request.getAnnotationTypes()));
+
+               this.discoveryServiceSequences = 
calculateDiscoveryServiceSequences(request.getAnnotationTypes());
+               Collections.sort(this.discoveryServiceSequences, new 
EffortComparator());
+       }
+
+       /**
+       *
+       * Represents a single discovery service sequence.
+       *
+       */
+       public class DiscoveryServiceSequence {
+               private LinkedHashSet<String> serviceSequence;
+
+               public DiscoveryServiceSequence() {
+                       this.serviceSequence = new LinkedHashSet<String>();
+               }
+
+               public DiscoveryServiceSequence(LinkedHashSet<String> 
serviceIds) {
+                       this.serviceSequence = serviceIds;
+               }
+
+               public LinkedHashSet<String> getServiceSequence() {
+                       return this.serviceSequence;
+               }
+
+               public List<String> getServiceSequenceAsList() {
+                       return new ArrayList<String>(this.serviceSequence);
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if ((obj == null) || !(obj instanceof 
DiscoveryServiceSequence)) {
+                               return false;
+                       }
+                       return 
this.getServiceSequence().equals(((DiscoveryServiceSequence) 
obj).getServiceSequence());
+               }
+
+               // Overriding hashCode method to ensure proper results of 
equals() method
+               // (See of 
http://www.javaranch.com/journal/2002/10/equalhash.html)
+               @Override
+               public int hashCode() {
+                       return Utils.joinStrings(new 
ArrayList<String>(this.serviceSequence), ',').hashCode();
+               }
+       }
+
+       /**
+       *
+       * Internal class that estimates the effort for executing a sequence of 
discovery services.
+       * Should be extended to take runtime statistics into account. 
+       *
+       */
+       private class EffortComparator implements 
Comparator<DiscoveryServiceSequence> {
+               public int compare(DiscoveryServiceSequence da1, 
DiscoveryServiceSequence da2) {
+                       if (da1.getServiceSequence().size() < 
da2.getServiceSequence().size()) {
+                               return -1;
+                       } else if (da1.getServiceSequence().size() > 
da2.getServiceSequence().size()) {
+                               return 1;
+                       } else {
+                               return 0;
+                       }
+               }
+       }
+
+       /**
+        * Returns the calculated list of discovery service sequences ordered 
by the execution effort,
+        * starting with the sequence that is supposed to cause the minimum 
execution effort.
+        *
+        * @return List of discovery service sequences
+        */
+       public List<DiscoveryServiceSequence> getDiscoveryServiceSequences() {
+               return this.discoveryServiceSequences;
+       }
+
+       /**
+        * Returns recommended discovery service sequence, i.e. the one that is 
supposed to cause the
+        * minimum execution effort.
+        *
+        * @return Discovery service sequence
+        */
+       public List<String> getRecommendedDiscoveryServiceSequence() {
+               if (!getDiscoveryServiceSequences().isEmpty()) {
+                       return new 
ArrayList<String>(this.discoveryServiceSequences.get(0).getServiceSequence());
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Remove all discovery service sequences that contain a specific 
service id. Use this method
+        * to update the list of discovery service sequences after a specific 
discovery service has
+        * failed and should not be used any more.
+        *
+        * @param serviceId Id of discovery service to be removed
+        * @return Discovery service sequence
+        */
+       public boolean removeDiscoveryServiceSequences(String serviceId) {
+               boolean serviceRemoved = false;
+               List<DiscoveryServiceSequence> updatedList = new 
ArrayList<DiscoveryServiceSequence>();
+               updatedList.addAll(this.discoveryServiceSequences);
+               for (DiscoveryServiceSequence sequence : 
this.discoveryServiceSequences) {
+                       if (sequence.getServiceSequence().contains(serviceId)) {
+                               updatedList.remove(sequence);
+                               serviceRemoved = true;
+                       }
+               }
+               this.discoveryServiceSequences = updatedList;
+               return serviceRemoved ? true : false;
+       }
+
+       /**
+        * Internal method that determines all possible sequences of discovery 
services which could be used
+        * to generate the requested annotation type. Using recursion, all 
levels of prerequisites are taken
+        * into account.
+        *
+        * @param annotationType Annotation type to be generated
+        * @return List of discovery service sequences that generate the 
requested annotation type
+        */
+       private List<DiscoveryServiceSequence> 
getDiscoveryServiceSequencesForAnnotationType(String annotationType) {
+               List<DiscoveryServiceSequence> result = new 
ArrayList<DiscoveryServiceSequence>();
+               for (DiscoveryServiceProperties dsProps : this.dsPropList) {
+                       if ((dsProps.getResultingAnnotationTypes() != null) && 
dsProps.getResultingAnnotationTypes().contains(annotationType)) {
+                               DiscoveryServiceSequence da = new 
DiscoveryServiceSequence();
+                               da.getServiceSequence().add(dsProps.getId());
+                               List<DiscoveryServiceSequence> 
discoveryApproachesForService = new ArrayList<DiscoveryServiceSequence>();
+                               discoveryApproachesForService.add(da);
+
+                               // If there are prerequisite annotation types, 
also merge their services into the result
+                               if ((dsProps.getPrerequisiteAnnotationTypes() 
!= null)
+                                               && 
!dsProps.getPrerequisiteAnnotationTypes().isEmpty()) {
+                                       discoveryApproachesForService = 
combineDiscoveryServiceSequences(
+                                                       
calculateDiscoveryServiceSequences(dsProps.getPrerequisiteAnnotationTypes()),
+                                                       
discoveryApproachesForService);
+                                       ;
+                               }
+                               logger.log(Level.INFO, "Discovery appoaches for 
annotationType " + annotationType + ":");
+                               for 
(DeclarativeRequestMapper.DiscoveryServiceSequence discoveryApproach : 
discoveryApproachesForService) {
+                                       logger.log(Level.INFO,
+                                                       Utils.joinStrings(new 
ArrayList<String>(discoveryApproach.getServiceSequence()), ','));
+                               }
+
+                               result.addAll(discoveryApproachesForService);
+                       }
+               }
+               return result;
+       }
+
+       /**
+        * Internal method that combines two lists of discovery service 
sequences by generating all possible
+        * combinations of the entries of both lists. The methods avoids 
duplicate services in each sequence
+        * and duplicate sequences in the resulting list.
+        *
+        * @param originalSequences Original list of discovery service sequences
+        * @param additionalSequences Second list discovery service sequences
+        * @return Combined list of discovery service sequences
+        */
+       private List<DiscoveryServiceSequence> 
combineDiscoveryServiceSequences(List<DiscoveryServiceSequence> 
originalSequences, List<DiscoveryServiceSequence> additionalSequences) {
+               // Example scenario for combining service sequences:
+               //
+               // Lets assume a service S that generates two annotation types 
AT1 and AT2 and S has prerequisite
+               // annotation type AT_P. There are two services P1 and P2 
creating annotation type AT_P.
+               // The possible service sequences for generating annotation 
type AT1 are "P1, S" and "P2, S", same for AT2.
+               //
+               // When requesting a set of annotation types AT1 and AT2, this 
will result in the following four combinations
+               // which contain several redundancies:
+               // "P1, S, P1, S", "P1, S, P2, S", "P2, S, P1, S", "P2, S, P2, 
S"
+               // 
+               // This method uses three ways of removing redundancies:
+               //
+               // 1. Given that class DiscoveryServiceSequence internally uses 
LinkedHashSet, duplicate services are removed from the
+               // service sequences, resulting in: "P1, S", "P1, S, P2", "P2, 
S, P1", "P2, S"
+               //
+               // 2. Service sequences are only merged if the last service of 
the additional sequence is not already part of the original
+               // one which results in: "P1, S", "P1, S", "P2, S", "P2, S"
+               // 
+               // 3. Duplicate sequences are ignored, resulting in: "P1, S", 
"P2, S" which is the final result.  
+
+               List<DiscoveryServiceSequence> discoveryApproaches = new 
ArrayList<DiscoveryServiceSequence>();
+               for (DiscoveryServiceSequence da1 : originalSequences) {
+                       for (DiscoveryServiceSequence da2 : 
additionalSequences) {
+                               DiscoveryServiceSequence da = new 
DiscoveryServiceSequence();
+                               
da.getServiceSequence().addAll(da1.getServiceSequence());
+
+                               // Add the second list only if its last 
serviceId is not already part of the first list
+                               // (Otherwise unnecessary prerequisite services 
might be added, because the 2nd list may use different ones)
+                               if 
(!da1.getServiceSequence().contains(da2.getServiceSequenceAsList().get(da2.getServiceSequenceAsList().size()
 - 1))) {
+                                       
da.getServiceSequence().addAll(da2.getServiceSequence());
+                               }
+
+                               // Avoid duplicate entries (uses 
DiscoveryServiceSequence.equals() method)
+                               if (!discoveryApproaches.contains(da)) {
+                                       discoveryApproaches.add(da);
+                               }
+                       }
+               }
+               return discoveryApproaches;
+       }
+
+       /**
+        * Internal method that determines all possible sequences of discovery 
services which could be used
+        * to generate a set of requested annotation types.
+        *
+        * Each discovery service creates one or multiple annotation types and 
may have prerequisite annotation types.
+        * As there may be multiple services creating the same annotation type 
(maybe by using different prerequisite
+        * annotation types), this may result in a complex dependencies. Using 
recursion, this method iterates through 
+        * all the dependencies in order to calculate a list of all possible 
sequences of discovery services that could
+        * be used to calculate the requested annotation types.
+        * 
+        * @param annotationTypes List of annotation types to be generated
+        * @return List of discovery service sequences that generate the 
requested annotation types
+        */
+       private List<DiscoveryServiceSequence> 
calculateDiscoveryServiceSequences(List<String> annotationTypes) {
+               List<DiscoveryServiceSequence> result = null;
+
+               for (String currentType : annotationTypes) {
+                       // Calculate discovery sequences for current annotation 
type
+                       List<DiscoveryServiceSequence> 
additionalDiscoveryApproaches = 
getDiscoveryServiceSequencesForAnnotationType(currentType);
+                       if (result == null) {
+                               result = additionalDiscoveryApproaches;
+                       } else {
+                               // Merge with discovery sequences determined 
for the previous annotation types in the list 
+                               result = 
combineDiscoveryServiceSequences(result, additionalDiscoveryApproaches);
+                       }
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
new file mode 100755
index 0000000..20b7661
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultStatusQueueStore.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequest;
+import org.apache.atlas.odf.api.metadata.models.Annotation;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.DiscoveryServiceQueueManager;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestSummary;
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.annotation.AnnotationStore;
+import org.apache.atlas.odf.api.annotation.AnnotationStoreUtils;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.metadata.MetaDataObjectReference;
+
+/**
+ * This class is an in-memory store for both request trackers (showing the 
status of analysis requests) as well as
+ * a for annotations. Both trackers and annotations are put on the ODF status 
queue which 
+ * (a) stores as "semi"-persistent store ("semi" because Kafka's retention 
mechanism will evantuall delete them), and
+ * (b) a way to propagate those changes to other ODF nodes.
+ * The annotations and trackers themselves are stored in memory in static 
variables.
+ * 
+ * This is how it works:
+ * 1. A single consumer thread listens on the status topic
+ * 2. If an incoming status queue entry is a tracker, it stores it in the 
in-memory tracker store
+ *    If it is an annotation, it stores it in the in-memory annotation store
+ * 3. Queries for trackers and annotations only go against the in-memory stores
+ * 4. When a check for overaged entries occurs (a check that removes trackers 
form the store which are older than the queue retention time)
+ *    the annotations for overaged and finished requests are also deleted (see 
removeOveragedEntries())
+ *   
+ *    
+ *
+ */
+public class DefaultStatusQueueStore implements AnalysisRequestTrackerStore, 
AnnotationStore {
+
+       static Logger logger = 
Logger.getLogger(DefaultStatusQueueStore.class.getName());
+       
+       public static final long IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS = 5000;
+       
+       static Object globalRequestStoreMapLock = new Object();
+       
+       /*
+        * http://docs.oracle.com/javase/7/docs/api/java/util/LinkedHashMap.html
+        * 
+        * A structural modification is any operation that adds or deletes one 
or more mappings or, in the case of access-ordered linked hash maps, affects 
iteration order. 
+        * In insertion-ordered linked hash maps, merely changing the value 
associated with a key that is already contained in the map is not a structural 
modification. 
+        * In access-ordered linked hash maps, merely querying the map with get 
is a structural modification.) 
+        */
+       static LinkedHashMap<String, AnalysisRequestTracker> 
globalRequestStoreMap = new LinkedHashMap<String, AnalysisRequestTracker>();
+       
+       /*
+        * This map is only used to track if storing an object was successful
+        *  
+        */
+       static ConcurrentHashMap<String, Boolean> globalStoreSuccessMap = new 
ConcurrentHashMap<String, Boolean>();
+               
+       private String analysisRun;
+       
+       // simplest implementation for now: just keep a simple list
+       private static List<Annotation> storedAnnotations = new LinkedList<>();
+       private static Object storedAnnotationsLock = new Object();
+
+       /**
+        * This processor reads trackers from the queue and stores it in the 
globalRequestStoreMap.
+        * The thread for this processor is created in the QueueManager 
implementation.
+        *
+        */
+       public static class StatusQueueProcessor implements 
QueueMessageProcessor {
+               Logger logger = 
Logger.getLogger(StatusQueueProcessor.class.getName());
+
+               @Override
+               public void process(ExecutorService executorService, String 
message, int partition, long offset) {
+                       StatusQueueEntry sqe = new StatusQueueEntry();
+                       try {
+                               sqe = JSONUtils.fromJSON(message, 
StatusQueueEntry.class);
+                       } catch (Exception e) {
+                               logger.log(Level.WARNING, "Entry in status 
queue could not be processed", e);
+                       }
+                       
+                       // first handle trackers and / or initial cleanup
+                       synchronized (globalRequestStoreMapLock) {
+                               if (sqe.getAnalysisRequestTracker() != null) {
+                                       try {
+                                               AnalysisRequestTracker tracker 
= sqe.getAnalysisRequestTracker();
+                                               String requestID = 
tracker.getRequest().getId();
+                                               logger.log(Level.FINEST, "Store 
status queue: found tracker with id ''{0}'', tracker: {1}", new Object[] { 
requestID, message });
+                                               if (tracker.getStatus() == 
STATUS.FINISHED) {
+                                                       logger.log(Level.INFO, 
"Request with id ''{0}'' is finished, result: {1}", new Object[] { requestID, 
message });
+                                               }
+                                               //remove item so that it is 
added to the end of the list.
+                                               if 
(globalRequestStoreMap.containsKey(requestID)) {
+                                                       
globalRequestStoreMap.remove(requestID);
+                                               }
+
+                                               
globalRequestStoreMap.put(requestID, tracker);
+                                               if (tracker != null && 
tracker.getRevisionId() != null) {
+                                                       
globalStoreSuccessMap.put(tracker.getRevisionId(), true);
+                                               }
+
+                                       } catch (Exception e) {
+                                               logger.log(Level.WARNING, 
"Tracker entry in status queue could not be processed", e);
+                                       }
+                               }                               
+                       }
+                       
+                       if (sqe.getAnnotation() != null) {
+                               Annotation annot = sqe.getAnnotation();
+                               logger.log(Level.FINEST, "Received annotationk 
over status queue: ''{0}''", annot.getReference().getId());
+                               synchronized (storedAnnotationsLock) {
+                                       storedAnnotations.add(annot);
+                                       
globalStoreSuccessMap.put(annot.getReference().getId(), true);
+                               }
+                       }
+
+                       removeOveragedEntries();
+               }
+
+       }
+
+       /////////////////////////////////////////////
+       // AnalysisRequestTrackerStore interface implementation
+
+       
+       /*
+        * This store uses the lastModified timestamp to remove overaged 
trackers. 
+        * Therefore, the lastModified timestamp MUST be set before storing 
anything and prevent unwanted removal
+        */
+       @Override
+       public void store(AnalysisRequestTracker tracker) {
+               String id = tracker.getRequest().getId();
+               logger.fine("Store " + id + " in trackerStore");
+
+               String revId = UUID.randomUUID() + "_" + 
System.currentTimeMillis();
+               tracker.setRevisionId(revId);
+               globalStoreSuccessMap.put(revId, false);
+               
+               ODFInternalFactory factory = new ODFInternalFactory();
+               DiscoveryServiceQueueManager qm = 
factory.create(DiscoveryServiceQueueManager.class);
+               // put the tracker onto the status queue, the actual map that 
is used in query() is filled by the ARTProcessor listening on the status queue
+               StatusQueueEntry sqe = new StatusQueueEntry();
+               sqe.setAnalysisRequestTracker(tracker);
+               qm.enqueueInStatusQueue(sqe);
+               waitUntilEntryArrives(revId);
+       }
+
+       private void waitUntilEntryArrives(String entryId) {
+               boolean found = false;
+               int maxNumWaits = 1500;
+               int sleepMS = 20;
+               while (maxNumWaits > 0) {
+                       final Boolean storageSuccess = 
globalStoreSuccessMap.get(entryId);
+                       if (storageSuccess != null && storageSuccess == true) {
+                               found = true;
+                               globalStoreSuccessMap.remove(entryId);
+                               break;
+                       }
+                       try {
+                               Thread.sleep(sleepMS);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+                       maxNumWaits--;
+               }
+               if(!found){
+                       final String message = "The tracker could not be stored 
in 30 sec!";
+                       logger.warning(message);
+                       throw new RuntimeException(message);
+               }else{
+                       logger.fine("Tracker stored after " + ((1500 - 
maxNumWaits) * sleepMS) + " ms");
+               }
+       }
+
+       @Override
+       public AnalysisRequestTracker query(String analysisRequestId) {
+               logger.fine("Querying store for " + analysisRequestId);
+               synchronized (globalRequestStoreMapLock) {
+                       AnalysisRequestTracker tracker = 
globalRequestStoreMap.get(analysisRequestId);
+                       return tracker;
+               }
+       }
+       
+       @Override
+       public void clearCache() {
+               logger.fine("Clearing store cache");
+               synchronized (globalRequestStoreMapLock) {
+                       globalRequestStoreMap.clear();
+               }
+       }
+       
+       private static void removeOveragedEntries(){
+               Set<String> finishedRequests = new HashSet<>();
+               logger.fine("Removing overaged entries from store");
+               synchronized (globalRequestStoreMapLock) {
+                       Iterator<Entry<String, AnalysisRequestTracker>> 
entryIterator = globalRequestStoreMap.entrySet().iterator();
+                       long maxRetentionMS = new 
ODFFactory().create().getSettingsManager().getODFSettings().getMessagingConfiguration().getAnalysisRequestRetentionMs();
+                       long currentTimeMS = System.currentTimeMillis();
+                       while(entryIterator.hasNext()){
+                               Entry<String, AnalysisRequestTracker> entry = 
entryIterator.next();
+                               AnalysisRequestTracker tracker = 
entry.getValue();
+                               if(currentTimeMS - tracker.getLastModified() >= 
maxRetentionMS){
+                                       if (tracker.getStatus() == 
STATUS.FINISHED || tracker.getStatus() == STATUS.ERROR) {
+                                               
finishedRequests.add(tracker.getRequest().getId());
+                                       }
+                                       entryIterator.remove();
+                                       logger.log(Level.INFO, "Removed 
overaged status tracker with id ''{0}''", new Object[] { entry.getKey() });
+                               }else{
+                                       /*
+                                        * items in a linkedHashMap are ordered 
in the way they were put into the map.
+                                        * Because of this, if one item is not 
overaged, all following won't be either
+                                       */
+                                       break;
+                               }
+                       }
+               }
+               synchronized (storedAnnotationsLock) {
+                       ListIterator<Annotation> it = 
storedAnnotations.listIterator();
+                       while (it.hasNext()) {
+                               Annotation annot = it.next();
+                               if 
(finishedRequests.contains(annot.getAnalysisRun())) {
+                                       it.remove();
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public int getSize() {
+               synchronized (globalRequestStoreMapLock) {
+                       return globalRequestStoreMap.keySet().size();
+               }
+       }
+       
+       @Override
+       public AnalysisRequestTracker findSimilarQueuedRequest(AnalysisRequest 
request) {
+               synchronized (globalRequestStoreMapLock) {
+                       for (AnalysisRequestTracker tracker : 
globalRequestStoreMap.values()) {
+                               long startedAfterLimit = 
System.currentTimeMillis() - IGNORE_SIMILAR_REQUESTS_TIMESPAN_MS;
+                               if (TrackerUtil.isAnalysisWaiting(tracker) || 
+                                               
(tracker.getNextDiscoveryServiceRequest() == 0 && tracker.getStatus() == 
STATUS.DISCOVERY_SERVICE_RUNNING && tracker.getLastModified() >= 
startedAfterLimit)) {
+                                       AnalysisRequest otherRequest = 
tracker.getRequest();
+                                       List<MetaDataObjectReference> dataSets 
= request.getDataSets();
+                                       List<MetaDataObjectReference> 
otherDataSets = otherRequest.getDataSets();
+                                       
+                                       if (otherDataSets.containsAll(dataSets) 
&& tracker.getDiscoveryServiceRequests().get(0).getDiscoveryServiceId().equals(
+                                                       
request.getDiscoveryServiceSequence().get(0))) {
+                                               logger.log(Level.FINEST, "Found 
similar request for request {0}", new Object[] { request.getId()});
+                                               return tracker;
+                                       }
+                               }
+                       }
+                       return null;
+               }
+       }
+
+       
+       @Override
+       public List<AnalysisRequestTracker> getRecentTrackers(int offset, int 
limit) {
+               if (offset < 0) {
+                       throw new RuntimeException("Offset parameter cannot be 
negative.");
+               }
+               if (limit < -1) {
+                       throw new RuntimeException("Limit parameter cannot be 
smaller than -1.");
+               }
+               synchronized (globalRequestStoreMapLock) {
+                       List<AnalysisRequestTracker> arsList = new 
ArrayList<>();
+                       Iterator<Map.Entry<String, AnalysisRequestTracker>> it 
= globalRequestStoreMap.entrySet().iterator();
+                       // filter out health check requests
+                       while (it.hasNext()) {
+                               AnalysisRequestTracker t = it.next().getValue();
+                               if 
(!t.getRequest().getDataSets().get(0).getId().startsWith(ControlCenter.HEALTH_TEST_DATA_SET_ID_PREFIX))
 {
+                                       arsList.add(t);
+                               }
+                       }
+                       // now pick number many requests from the end
+                       List<AnalysisRequestTracker> result = new ArrayList<>();
+                       if (arsList.size() > offset) {
+                               int startIndex = arsList.size() - offset - 
limit;
+                               if (limit == -1 || startIndex < 0) {
+                                       startIndex = 0;
+                               }
+                               int endIndex = arsList.size() - offset - 1;
+                               if (endIndex < 0) {
+                                       endIndex = 0;
+                               }
+                               for (int i=endIndex ; i>=startIndex; i--) {
+                                       result.add(arsList.get(i));
+                               }
+                       }
+                       return result;
+               }
+       }
+       
+       @Override
+       public AnalysisRequestSummary getRequestSummary() {
+               synchronized (globalRequestStoreMapLock) {
+                       try {
+                               List<AnalysisRequestTracker> recentTrackers = 
this.getRecentTrackers(0, -1);
+                               int totalSuccess = 0;
+                               int totalFailure = 0;
+       
+                               for (AnalysisRequestTracker tracker : 
recentTrackers) {
+                                       if 
(STATUS.FINISHED.equals(tracker.getStatus())) {
+                                               totalSuccess++;
+                                       } else if 
(STATUS.ERROR.equals(tracker.getStatus())) {
+                                               totalFailure++;
+                                       }
+                               }
+                               return new AnalysisRequestSummary(totalSuccess, 
totalFailure);
+                       } catch (Exception exc) {
+                               throw new RuntimeException(exc);
+                       }
+               }       
+       }
+
+       /////////////////////////////////////////////
+       // AnnotationStore interface implementation
+       
+       @Override
+       public Properties getProperties() {
+               Properties props = new Properties();
+               props.put(STORE_PROPERTY_TYPE, "DefaultAnnotationStore");
+               props.put(STORE_PROPERTY_ID, getRepositoryId());
+               props.put(STORE_PROPERTY_DESCRIPTION, "A default in-memory 
implementation of the annotation store storing its results via Kafka");
+               return props;
+       }
+
+       @Override
+       public String getRepositoryId() {
+               return "ODFDefaultAnnotationStore";
+       }
+
+       @Override
+       public ConnectionStatus testConnection() {
+               return ConnectionStatus.OK;
+       }
+
+       @Override
+       public MetaDataObjectReference store(Annotation annotation) {
+               // clone object
+               try {
+                       annotation = JSONUtils.cloneJSONObject(annotation);
+               } catch (JSONException e) {
+                       logger.log(Level.SEVERE, "Annotation could not be 
stored because JSON conversion failed.", e);
+                       throw new RuntimeException(e);
+               }
+               
+               // create a new reference
+               String annotId = "Annot" + UUID.randomUUID() + "_" + 
System.currentTimeMillis();
+               logger.log(Level.FINEST, "Storing annotation with ID ''{0}''", 
annotId);
+               MetaDataObjectReference ref = new MetaDataObjectReference();
+               ref.setId(annotId);
+               ref.setRepositoryId(getRepositoryId());
+               annotation.setReference(ref);
+               if (analysisRun != null) {
+                       annotation.setAnalysisRun(analysisRun);
+               }
+               
+               // re-use mechanism from status queue to wait until message has 
arrived via Kafka
+               globalStoreSuccessMap.put(annotId, false);
+               DiscoveryServiceQueueManager qm = new 
ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+               StatusQueueEntry sqe = new StatusQueueEntry();
+               sqe.setAnnotation(annotation);
+               qm.enqueueInStatusQueue(sqe);
+               waitUntilEntryArrives(annotId);
+               return ref;
+       }
+
+       @Override
+       public List<Annotation> getAnnotations(MetaDataObjectReference object, 
String analysisRequestId) {
+               List<Annotation> results = new ArrayList<>();
+               synchronized (storedAnnotationsLock) {
+                       logger.log(Level.FINEST, "Number of annotations stored: 
''{0}''", storedAnnotations.size());
+                       ListIterator<Annotation> it = 
storedAnnotations.listIterator();
+                       while (it.hasNext()) {
+                               Annotation annot = it.next();
+                               boolean match = true;
+                               if (object != null) {
+                                       match = match && 
object.equals(AnnotationStoreUtils.getAnnotatedObject(annot));
+                               }
+                               if (annot.getAnalysisRun() != null) {
+                                       // analysisRun is not set for health 
check and for some of the tests
+                                       if (analysisRequestId != null) {
+                                               match &= 
annot.getAnalysisRun().equals(analysisRequestId);
+                                       }
+                               }
+                               if (match) {
+                                       results.add(annot);
+                               }
+                       }
+               }
+               logger.log(Level.FINEST, "Number of annotations found for 
request Id ''{0}'': ''{1}''", new Object[]{analysisRequestId, results.size()});
+               return results;
+       }
+
+       @Override
+       public void setAnalysisRun(String analysisRun) {
+               this.analysisRun = analysisRun;
+       }
+
+       @Override
+       public String getAnalysisRun() {
+               return this.analysisRun;
+       }
+
+       @Override
+       public Annotation retrieveAnnotation(MetaDataObjectReference ref) {
+               synchronized (storedAnnotationsLock) {
+                       logger.log(Level.FINEST, "Number of annotations stored: 
''{0}''", storedAnnotations.size());
+                       ListIterator<Annotation> it = 
storedAnnotations.listIterator();
+                       while (it.hasNext()) {
+                               Annotation annot = it.next();
+                               if (annot.getReference().equals(ref)) {
+                                       return annot;
+                               }
+                       }
+               }
+               return null;
+       }
+
+       @Override
+       public void setStatusOfOldRequest(long cutOffTimestamp, STATUS status, 
String detailsMessage) {
+               synchronized (globalRequestStoreMapLock) {
+                       DiscoveryServiceQueueManager qm = new 
ODFInternalFactory().create(DiscoveryServiceQueueManager.class);
+                       for (AnalysisRequestTracker tracker : 
globalRequestStoreMap.values()) {
+                               if (tracker.getLastModified() < cutOffTimestamp 
//
+                                               && 
(STATUS.DISCOVERY_SERVICE_RUNNING.equals(tracker.getStatus()) //
+                                                               || 
STATUS.IN_DISCOVERY_SERVICE_QUEUE.equals(tracker.getStatus()) //
+                                                               || 
STATUS.INITIALIZED.equals(tracker.getStatus()) //
+                                               )) {
+                                       // set the tracker in-memory to have 
the result available immediately
+                                       tracker.setStatus(status);
+                                       if (detailsMessage == null) {
+                                               detailsMessage = "Setting 
request to " + status + " because it was last modified before " + new 
Date(cutOffTimestamp);
+                                       }
+                                       
tracker.setStatusDetails(detailsMessage);
+                                       // put tracker onto queue
+                                       StatusQueueEntry sqe = new 
StatusQueueEntry();
+                                       sqe.setAnalysisRequestTracker(tracker);
+                                       qm.enqueueInStatusQueue(sqe);
+                               }
+                       }
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
new file mode 100755
index 0000000..0ea909f
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultThreadManager.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.lang.Thread.State;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.engine.ThreadStatus;
+
+public class DefaultThreadManager implements ThreadManager {
+
+       private Logger logger = 
Logger.getLogger(DefaultThreadManager.class.getName());
+
+       static Object unmanagedThreadLock = new Object();
+       static Map<String, Thread> unmanagedThreadMap = new HashMap<String, 
Thread>();
+       static Map<String, ODFRunnable> unmanagedThreadRunnableMap = new 
HashMap<String, ODFRunnable>();
+       
+       ExecutorService executorService;
+
+       public DefaultThreadManager() {
+       }
+       
+       private boolean isThreadRunning(Thread thread) {
+               return thread.getState() != State.TERMINATED;
+       }
+       
+       private void purgeTerminatedThreads() {
+               List<String> entriesToBeRemoved = new ArrayList<String>();
+               List<String> entriesToBeKept = new ArrayList<String>();
+               for (Map.Entry<String, Thread> entry : 
unmanagedThreadMap.entrySet()) {
+                       if (!isThreadRunning(entry.getValue())) {
+                               entriesToBeRemoved.add(entry.getKey());
+                       } else {
+                               entriesToBeKept.add(entry.getKey());
+                       }
+               }
+               for (String id : entriesToBeRemoved) {
+                       unmanagedThreadMap.remove(id);
+                       unmanagedThreadRunnableMap.remove(id);
+               }
+               logger.finer("Removed finished threads: " + 
entriesToBeRemoved.toString());
+               logger.finer("Kept unfinished threads: " + 
entriesToBeKept.toString());
+       }
+       
+       @Override
+       public ThreadStartupResult startUnmanagedThread(final String id, final 
ODFRunnable runnable) {
+               ThreadStartupResult result = new ThreadStartupResult(id) {
+                       @Override
+                       public boolean isReady() {
+                               synchronized (unmanagedThreadLock) {
+                                       if 
(unmanagedThreadRunnableMap.containsKey(id)) {
+                                               return 
unmanagedThreadRunnableMap.get(id).isReady();
+                                       }
+                               }
+                               return false;
+                       }
+               };
+               synchronized (unmanagedThreadLock) {
+                       purgeTerminatedThreads();
+                       Thread t = unmanagedThreadMap.get(id);
+                       if (t != null) {
+                               if (isThreadRunning(t)) {
+                                       return result;
+                               }
+                       } 
+                       runnable.setExecutorService(executorService);
+
+                       Thread newThread = new Thread(runnable);
+                       result.setNewThreadCreated(true);
+                       newThread.setUncaughtExceptionHandler(new 
UncaughtExceptionHandler() {
+
+                               @Override
+                               public void uncaughtException(Thread thread, 
Throwable throwable) {
+                                       logger.log(Level.WARNING, "Uncaught 
exception in thread " + id + " - Thread will shutdown!", throwable);
+                                       synchronized (unmanagedThreadLock) {
+                                               purgeTerminatedThreads();
+                                       }
+                               }
+                       });
+
+                       newThread.setDaemon(true); // TODO is it a daemon?
+                       newThread.start();
+                       unmanagedThreadMap.put(id, newThread);
+                       unmanagedThreadRunnableMap.put(id,  runnable);
+               }
+               return result;
+       }
+
+       @Override
+       public ThreadStatus.ThreadState getStateOfUnmanagedThread(String id) {
+               synchronized (unmanagedThreadLock) {
+                       Thread t = unmanagedThreadMap.get(id);
+                       if (t == null) {
+                               return ThreadStatus.ThreadState.NON_EXISTENT;
+                       }
+                       Thread.State ts = t.getState();
+                       switch (ts) {
+                       case TERMINATED:
+                               return ThreadStatus.ThreadState.FINISHED;
+                       default:
+                               return ThreadStatus.ThreadState.RUNNING;
+                       }
+               }
+       }
+
+
+
+       @Override
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
+
+       @Override
+       public void shutdownAllUnmanagedThreads() {
+               synchronized (unmanagedThreadLock) {
+                       logger.log(Level.INFO, "Shutting down all ODF 
threads...");
+                       for (String id : unmanagedThreadMap.keySet()) {
+                               shutdownThreadImpl(id, false);
+                       }
+                       unmanagedThreadMap.clear();
+                       unmanagedThreadRunnableMap.clear();
+                       logger.log(Level.INFO, "All ODF threads shutdown");
+                       purgeTerminatedThreads();
+               }               
+       }
+       
+       public void shutdownThreads(List<String> names) {
+               synchronized (unmanagedThreadLock) {
+                       for (String name : names) {
+                               shutdownThreadImpl(name, true);
+                       }
+               }               
+       }
+
+       private void shutdownThreadImpl(String id, boolean purge) {
+               Thread t = unmanagedThreadMap.get(id);
+               if (t == null) {
+                       return;
+               }
+               ODFRunnable r = unmanagedThreadRunnableMap.get(id);
+               r.cancel();
+               try {
+                       Thread.sleep(500);
+               } catch (InterruptedException e1) {
+                       e1.printStackTrace();
+               }
+               int max = 60;
+               while (t.getState() != Thread.State.TERMINATED) {
+                       if (max == 0) {
+                               break;
+                       }
+                       max--;
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               // do nothing
+                               e.printStackTrace();
+                       }
+               }
+               if (max == 0) {
+                       logger.log(Level.WARNING, "Thread {0} did not stop on 
its own, must be interrupted.", id);
+                       t.interrupt();
+               }
+               if (purge) {
+                       purgeTerminatedThreads();
+               }
+       }
+
+       @Override
+       public int getNumberOfRunningThreads() {
+               synchronized (unmanagedThreadLock) {
+                       int result = 0;
+                       for (Thread t : unmanagedThreadMap.values()) {
+                               if (isThreadRunning(t)) {
+                                       result++;
+                               }
+                       }
+                       return result;
+               }
+       }
+
+       @Override
+       public List<ThreadStatus> getThreadManagerStatus() {
+               synchronized (unmanagedThreadLock) {
+                       List<ThreadStatus> result = new 
ArrayList<ThreadStatus>();
+                       for (Entry<String, Thread> entry : 
unmanagedThreadMap.entrySet()) {
+                               ThreadStatus status = new ThreadStatus();
+                               status.setId(entry.getKey());
+                               
status.setState(getStateOfUnmanagedThread(entry.getKey()));
+                               ODFRunnable odfRunnable = 
unmanagedThreadRunnableMap.get(entry.getKey());
+                               if (odfRunnable != null) {
+                                       
status.setType(odfRunnable.getClass().getName());
+                               }
+                               result.add(status);
+                       }
+
+                       return result;
+               }
+       }
+
+       @Override
+       public void waitForThreadsToBeReady(long waitingLimitMs, 
List<ThreadStartupResult> startedThreads) throws TimeoutException {
+               Set<String> threadsToWaitFor = new HashSet<String>();
+               for (ThreadStartupResult res : startedThreads) {
+                       //Only if a new thread was created we wait for it to be 
ready.
+                       if (res.isNewThreadCreated()) {
+                               threadsToWaitFor.add(res.getThreadId());
+                       }
+               }
+               if (threadsToWaitFor.isEmpty()) {
+                       return;
+               }
+
+               final int msToWait = 200;
+               final long maxPolls = waitingLimitMs / msToWait;
+               int count = 0;
+               while (threadsToWaitFor.size() > 0 && count < maxPolls) {
+                       List<String> ready = new ArrayList<String>();
+                       List<String> notReady = new ArrayList<String>();
+                       for (ThreadStartupResult thr : startedThreads) {
+                               if (thr.isReady()) {
+                                       ready.add(thr.getThreadId());
+                                       
threadsToWaitFor.remove(thr.getThreadId());
+                               } else {
+                                       notReady.add(thr.getThreadId());
+                               }
+                       }
+
+                       logger.fine("Ready: " + ready);
+                       logger.fine("NotReady: " + notReady);
+
+                       try {
+                               Thread.sleep(msToWait);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+                       count++;
+               }
+               if (count >= maxPolls) {
+                       String msg = "Threads: " + threadsToWaitFor + "' are 
not ready yet after " + waitingLimitMs + " ms, give up to wait for it";
+                       logger.log(Level.WARNING, msg);
+                       throw new TimeoutException(msg);
+               }
+               
+               logger.fine("All threads ready after " + (count * msToWait) + 
"ms");
+       }
+
+       @Override
+       public ODFRunnable getRunnable(String name) {
+               synchronized (unmanagedThreadLock) {
+                       return unmanagedThreadRunnableMap.get(name);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
new file mode 100755
index 0000000..0f79e0c
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DefaultTransactionContextExecutor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.Callable;
+
+/**
+ * The default TransactionContextExecutor runs code in the same thread as the 
caller.
+ * 
+ */
+public class DefaultTransactionContextExecutor implements 
TransactionContextExecutor {
+       
+       @Override
+       public Object runInTransactionContext(Callable<Object> callable) throws 
Exception {
+               return callable.call();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
new file mode 100755
index 0000000..dbfb597
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceStarter.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.DataSetCheckResult;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse;
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.metadata.MetadataStore;
+import org.apache.atlas.odf.api.metadata.models.CachedMetadataStore;
+import org.apache.atlas.odf.api.metadata.models.MetaDataObject;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.analysis.AnalysisRequestTrackerStatus.STATUS;
+import org.apache.atlas.odf.api.discoveryservice.AnalysisRequestTracker;
+import org.apache.atlas.odf.api.discoveryservice.datasets.DataSetContainer;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.Utils;
+import org.apache.atlas.odf.core.annotation.InternalAnnotationStoreUtils;
+import org.apache.atlas.odf.json.JSONUtils;
+
+/**
+ * This class processes the entries of a discovery service queue and runs its 
respective discovery services in a separate thread. 
+ * 
+ */
+public class DiscoveryServiceStarter implements QueueMessageProcessor {
+
+       private Logger logger = 
Logger.getLogger(DiscoveryServiceStarter.class.getName());
+
+       AnalysisRequestTrackerStore trackerStore = null;
+       ControlCenter controlCenter = null;
+       Environment environment = null;
+       
+       /**
+        * parameters must be a three element String[] containing the 
DiscoveryServiceRequest, the partition number (int) and the offset (long).
+        */
+       public DiscoveryServiceStarter() {
+               ODFInternalFactory factory = new ODFInternalFactory();
+               trackerStore = 
factory.create(AnalysisRequestTrackerStore.class);
+               controlCenter = factory.create(ControlCenter.class);
+               environment = factory.create(Environment.class);
+       }
+       
+       private DiscoveryServiceRequest 
cloneDSRequestAndAddServiceProps(DiscoveryServiceRequest request, boolean 
requiresMetaDataCache) throws JSONException {
+               DiscoveryServiceRequest clonedRequest = 
JSONUtils.cloneJSONObject(request);
+               Map<String, Object> additionalProps = 
clonedRequest.getAdditionalProperties();
+               if (additionalProps == null) {
+                       additionalProps = new HashMap<>();
+                       clonedRequest.setAdditionalProperties(additionalProps);
+               }
+               // add service specific properties
+               String id = request.getDiscoveryServiceId();
+               Map<String, String> serviceProps = 
environment.getPropertiesWithPrefix(id);
+               additionalProps.putAll(serviceProps);
+               
+               // add cached metadata objects to request if required
+               if (requiresMetaDataCache) {
+                       MetaDataObject mdo = 
request.getDataSetContainer().getDataSet();
+                       MetadataStore mds = new 
ODFInternalFactory().create(MetadataStore.class);
+                       
clonedRequest.getDataSetContainer().setMetaDataCache(CachedMetadataStore.retrieveMetaDataCache(mds,
 mdo));
+               }
+
+               return clonedRequest;
+       }
+
+       
+       /**
+        * starts the service taken from the service runtime topic.
+        */
+       public void process(ExecutorService executorService, String message, 
int partition, long offset) {
+               AnalysisRequestTracker tracker = null;
+               try {
+                       tracker = JSONUtils.fromJSON(message, 
AnalysisRequestTracker.class);
+                       logger.log(Level.FINEST, "DSStarter: received tracker 
{0}", JSONUtils.lazyJSONSerializer(tracker));
+                       // load tracker from store and check if it was 
cancelled in the meantime
+                       AnalysisRequestTracker storedRequest = 
trackerStore.query(tracker.getRequest().getId());
+
+                       if (storedRequest == null || storedRequest.getStatus() 
!= STATUS.CANCELLED) {
+                               // set tracker to running
+                               
tracker.setStatus(STATUS.DISCOVERY_SERVICE_RUNNING);
+                               trackerStore.store(tracker);
+                               
+                               DiscoveryServiceRequest nextRequest = 
TrackerUtil.getCurrentDiscoveryServiceStartRequest(tracker);
+                               if (nextRequest == null) {
+                                       logger.log(Level.WARNING, "Request in 
queue has wrong format");
+                                       tracker.setStatus(STATUS.ERROR);
+                               } else {
+                                       
nextRequest.setTakenFromRequestQueue(System.currentTimeMillis());
+                                       trackerStore.store(tracker);
+                                       String dsID = 
nextRequest.getDiscoveryServiceId();
+                                       SyncDiscoveryService nextService = 
ControlCenter.getDiscoveryServiceProxy(dsID, tracker.getRequest());
+                                       if (nextService == null) {
+                                               logger.log(Level.WARNING, 
"Discovery Service ''{0}'' could not be created", dsID);
+                                               throw new 
DiscoveryServiceUnreachableException("Java proxy for service with id " + dsID + 
" could not be created");
+                                       } else {
+                                               DataSetContainer ds = 
nextRequest.getDataSetContainer();
+                                               DataSetCheckResult checkResult 
= nextService.checkDataSet(ds);
+                                               if (checkResult.getDataAccess() 
== DataSetCheckResult.DataAccess.NotPossible) {
+                                                       String responseDetails 
= "";
+                                                       if 
(checkResult.getDetails() != null) {
+                                                               responseDetails 
= " Reason: " + checkResult.getDetails();
+                                                       }
+                                                       if 
(tracker.getRequest().isIgnoreDataSetCheck()) {
+                                                               String msg = 
MessageFormat.format("Discovery service ''{0}'' cannot process data set 
''{1}''.{2} - Ignoring and advancing to next service",
+                                                                               
new Object[]{dsID, ds.getDataSet().getReference(), responseDetails});
+                                                               
logger.log(Level.INFO, msg);
+                                                               // check for 
next queue
+                                                               
DiscoveryServiceSyncResponse dummyResponse = new DiscoveryServiceSyncResponse();
+                                                               
dummyResponse.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                                                               
dummyResponse.setDetails(msg);
+                                                               
TrackerUtil.addDiscoveryServiceStartResponse(tracker, dummyResponse);
+                                                               
controlCenter.advanceToNextDiscoveryService(tracker);
+                                                       } else {
+                                                               
tracker.setStatus(STATUS.ERROR);
+                                                               String msg = 
MessageFormat.format("Discovery service ''{0}'' cannot process data set 
''{1}''.{2}",
+                                                                               
new Object[]{dsID, ds.getDataSet().getReference(), responseDetails});
+                                                               
tracker.setStatusDetails(msg);
+                                                               
logger.log(Level.WARNING, msg);
+                                                       }
+                                               } else {
+                                                       
nextService.setExecutorService(executorService);
+                                                       
runServiceInBackground(executorService, tracker, nextRequest, nextService);
+                                               }
+                                       }
+                               }
+                       }
+               } catch (DiscoveryServiceUnreachableException exc) {
+                       logger.log(Level.WARNING, "Discovery service could not 
be started because it is unreachable", exc);
+                       if (tracker != null) {
+                               tracker.setStatus(STATUS.ERROR);
+                               tracker.setStatusDetails(exc.getReason());
+                       }
+               } catch (Throwable exc) {
+                       logger.log(Level.WARNING, "An error occurred when 
starting the discovery service", exc);
+                       if (tracker != null) {
+                               tracker.setStatus(STATUS.ERROR);
+                               
tracker.setStatusDetails(Utils.getExceptionAsString(exc));
+                       }
+               }
+               updateTracker(tracker);
+       }
+
+       
+       class ServiceRunner implements ODFRunnable {
+               AnalysisRequestTracker tracker;
+               DiscoveryServiceRequest nextRequest;
+               SyncDiscoveryService nextService;
+               
+               public ServiceRunner(AnalysisRequestTracker tracker, 
DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) {
+                       super();
+                       this.tracker = tracker;
+                       this.nextRequest = nextRequest;
+                       this.nextService = nextService;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               runService(tracker, nextRequest, nextService);
+                       } catch (Throwable exc) {
+                               logger.log(Level.WARNING, "An error occurred 
when running the discovery service", exc);
+                               if (tracker != null) {
+                                       tracker.setStatus(STATUS.ERROR);
+                                       
tracker.setStatusDetails(Utils.getExceptionAsString(exc));
+                               }
+                       }
+                       updateTracker(tracker);
+               }
+               
+               @Override
+               public void setExecutorService(ExecutorService service) {
+                       
+               }
+               
+               @Override
+               public boolean isReady() {
+                       return true;
+               }
+               
+               @Override
+               public void cancel() {
+               }
+
+       }
+       
+       
+       private void runServiceInBackground(ExecutorService executorService, 
final AnalysisRequestTracker tracker, final DiscoveryServiceRequest 
nextRequest, final SyncDiscoveryService nextService) throws JSONException {
+               String suffix = nextRequest.getDiscoveryServiceId() + "_" + 
nextRequest.getOdfRequestId() + UUID.randomUUID().toString();
+               String runnerId = "DSRunner_" + suffix;
+               ThreadManager tm = new 
ODFInternalFactory().create(ThreadManager.class);
+               ServiceRunner serviceRunner = new ServiceRunner(tracker, 
nextRequest, nextService);
+               tm.setExecutorService(executorService);
+               tm.startUnmanagedThread(runnerId, serviceRunner);
+       }
+       
+       private void runService(AnalysisRequestTracker tracker, 
DiscoveryServiceRequest nextRequest, SyncDiscoveryService nextService) throws 
JSONException {
+               DiscoveryServiceResponse response = null;
+               String dsID = nextRequest.getDiscoveryServiceId();
+               boolean requiresAuxObjects = 
controlCenter.requiresMetaDataCache(nextService);
+               if (nextService instanceof SyncDiscoveryService) {
+                       SyncDiscoveryService nextServiceSync = 
(SyncDiscoveryService) nextService;
+                       logger.log(Level.FINER, "Starting synchronous analysis 
on service {0}", dsID);
+                       DiscoveryServiceSyncResponse syncResponse = 
nextServiceSync.runAnalysis(cloneDSRequestAndAddServiceProps(nextRequest, 
requiresAuxObjects));
+                       
nextRequest.setFinishedProcessing(System.currentTimeMillis());
+                       //Even if the analysis was concurrently cancelled we 
store the results since the service implementation could do this by itself 
either way.
+                       long before = System.currentTimeMillis();
+                       
InternalAnnotationStoreUtils.storeDiscoveryServiceResult(syncResponse.getResult(),
 tracker.getRequest());
+                       
nextRequest.setTimeSpentStoringResults(System.currentTimeMillis() - before);
+                       // remove result to reduce size of response
+                       syncResponse.setResult(null);
+                       response = syncResponse;
+               } else {
+                       throw new RuntimeException("Unknown Java proxy created 
for service with id " + dsID);
+               }
+
+               // process response
+               if (response.getCode() == null) {
+                       
response.setCode(DiscoveryServiceResponse.ResponseCode.UNKNOWN_ERROR);
+                       String origDetails = response.getDetails();
+                       response.setDetails(MessageFormat.format("Discovery 
service did not return a response code. Assuming error. Original message: {0}", 
origDetails));
+               }
+               switch (response.getCode()) {
+               case UNKNOWN_ERROR:
+                       TrackerUtil.addDiscoveryServiceStartResponse(tracker, 
response);
+                       tracker.setStatus(STATUS.ERROR);
+                       tracker.setStatusDetails(response.getDetails());
+                       logger.log(Level.WARNING, "Discovery Service ''{2}'' 
responded with an unknown error ''{0}'', ''{1}''", new Object[] { 
response.getCode().name(),
+                                       response.getDetails(), dsID });
+                       break;
+               case NOT_AUTHORIZED:
+                       TrackerUtil.addDiscoveryServiceStartResponse(tracker, 
response);
+                       tracker.setStatus(STATUS.ERROR);
+                       tracker.setStatusDetails(response.getDetails());
+                       logger.log(Level.WARNING, "Discovery Service ''{2}'' 
responded with an unauthorized ''{0}'', ''{1}''", new Object[] { 
response.getCode().name(),
+                                       response.getDetails(), dsID });
+                       break;
+               case TEMPORARILY_UNAVAILABLE:
+                       tracker.setStatus(STATUS.IN_DISCOVERY_SERVICE_QUEUE);
+                       logger.log(Level.INFO, "Discovery Service ''{2}'' 
responded that it is unavailable right now ''{0}'', ''{1}''", new Object[] {
+                                       response.getCode().name(), 
response.getDetails(), dsID });
+                       // reqeue and finish immediately
+                       controlCenter.getQueueManager().enqueue(tracker);
+                       return;
+               case OK:
+                       TrackerUtil.addDiscoveryServiceStartResponse(tracker, 
response);
+                       logger.log(Level.FINER, "Synchronous Discovery Service 
processed request ''{0}'', ''{1}''", new Object[] { response.getCode().name(), 
response.getDetails() });
+                       AnalysisRequestTracker storedTracker = 
trackerStore.query(tracker.getRequest().getId());
+                       //A user could've cancelled the analysis concurrently. 
In this case, ignore the response and don't overwrite the tracker
+                       if (storedTracker != null && storedTracker.getStatus() 
!= STATUS.CANCELLED) {
+                               // check for next queue
+                               
controlCenter.advanceToNextDiscoveryService(tracker);
+                       } else {
+                               logger.log(Level.FINER, "Not advancing analysis 
request because it was cancelled!");
+                       }
+                       break;
+               default:
+                       tracker.setStatus(STATUS.ERROR);
+                       tracker.setStatusDetails(response.getDetails());
+                       logger.log(Level.WARNING, "Discovery Service ''{2}'' 
responded with an unknown response ''{0}'', ''{1}''", new Object[] {
+                                       response.getCode().name(), 
response.getDetails(), dsID });
+                       break;
+               }
+       }
+
+       private boolean updateTracker(AnalysisRequestTracker tracker) {
+               boolean cancelled = false;
+               if (tracker != null) {
+                       AnalysisRequestTracker storedTracker = 
trackerStore.query(tracker.getRequest().getId());
+                       //A user could've cancelled the analysis concurrently. 
In this case, ignore the response and don't overwrite the tracker
+                       if (storedTracker == null || (! 
STATUS.CANCELLED.equals(storedTracker.getStatus())) ) {
+                               Utils.setCurrentTimeAsLastModified(tracker);
+                               trackerStore.store(tracker);
+                       } else {
+                               cancelled = true;
+                               logger.log(Level.FINER, "Not storing analysis 
tracker changes because it was cancelled!");
+                       }
+               }
+               return cancelled;
+       }
+       
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
new file mode 100755
index 0000000..38e0747
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/DiscoveryServiceUnreachableException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+public class DiscoveryServiceUnreachableException extends RuntimeException {
+
+       private static final long serialVersionUID = 3581149213306073675L;
+       
+       private String reason;
+
+       public DiscoveryServiceUnreachableException(String reason) {
+               super(reason);
+               this.reason = reason;
+       }
+
+       public String getReason() {
+               return reason;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
new file mode 100755
index 0000000..4cba0f6
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/ExecutorServiceFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ExecutorServiceFactory {
+
+       static Object execServiceLock = new Object();
+       static ExecutorService executorService = null;
+       
+       public ExecutorService createExecutorService() {
+               synchronized (execServiceLock) {
+                       if (executorService == null) {
+                               executorService = 
Executors.newCachedThreadPool();
+                       }
+               }
+               return executorService;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
new file mode 100755
index 0000000..848a673
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/HealthCheckServiceRuntime.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import 
org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.api.discoveryservice.*;
+
+public class HealthCheckServiceRuntime implements ServiceRuntime {
+       public static final String HEALTH_CHECK_RUNTIME_NAME = "HealthCheck";
+
+       @Override
+       public String getName() {
+               return HEALTH_CHECK_RUNTIME_NAME;
+       }
+
+       @Override
+       public long getWaitTimeUntilAvailable() {
+               return 0;
+       }
+
+       @Override
+       public DiscoveryService 
createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+               return new SyncDiscoveryServiceBase() {
+                       
+                       @Override
+                       public DiscoveryServiceSyncResponse 
runAnalysis(DiscoveryServiceRequest request) {
+                               DiscoveryServiceSyncResponse response = new 
DiscoveryServiceSyncResponse();
+                               
response.setCode(DiscoveryServiceResponse.ResponseCode.OK);
+                               response.setDetails("Health check service 
finished successfully");
+                               return response;
+                       }
+               };
+       }
+       
+       public static DiscoveryServiceProperties 
getHealthCheckServiceProperties() {            
+               DiscoveryServiceProperties props = new 
DiscoveryServiceProperties();
+               props.setId(ControlCenter.HEALTH_TEST_DISCOVERY_SERVICE_ID);
+               props.setDescription("Health check service");
+               
+               DiscoveryServiceEndpoint ep = new DiscoveryServiceEndpoint();
+               ep.setRuntimeName(HEALTH_CHECK_RUNTIME_NAME);
+               
+               props.setEndpoint(ep);
+               return props;
+       }
+
+       @Override
+       public String getDescription() {
+               return "Internal runtime dedicated to health checks";
+       }
+
+       @Override
+       public void validate(DiscoveryServiceProperties props) throws 
ValidationException {
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
----------------------------------------------------------------------
diff --git 
a/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
new file mode 100755
index 0000000..61a29b1
--- /dev/null
+++ 
b/odf/odf-core/src/main/java/org/apache/atlas/odf/core/controlcenter/JavaServiceRuntime.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.controlcenter;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.atlas.odf.api.discoveryservice.sync.SyncDiscoveryService;
+import org.apache.atlas.odf.api.settings.validation.ImplementationValidator;
+import org.apache.atlas.odf.api.settings.validation.ValidationException;
+import org.apache.atlas.odf.json.JSONUtils;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryService;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceJavaEndpoint;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceProperties;
+import org.apache.atlas.odf.api.discoveryservice.async.AsyncDiscoveryService;
+import org.apache.atlas.odf.core.Utils;
+
+public class JavaServiceRuntime implements ServiceRuntime {
+
+       Logger logger = Logger.getLogger(JavaServiceRuntime.class.getName());
+
+       public static final String NAME = "Java";
+       
+       @Override
+       public String getName() {
+               return NAME;
+       }
+
+       @Override
+       public long getWaitTimeUntilAvailable() {
+               // for now, always run
+               return 0;
+       }
+
+       @Override
+       public DiscoveryService 
createDiscoveryServiceProxy(DiscoveryServiceProperties props) {
+               DiscoveryService service = null;
+               String className = null;
+               try {
+                       className = JSONUtils.convert(props.getEndpoint(), 
DiscoveryServiceJavaEndpoint.class).getClassName();
+                       Class<?> clazz = Class.forName(className);
+                       Object o = clazz.newInstance();
+                       service = (DiscoveryService) o;
+               } catch (Exception e) {
+                       logger.log(Level.FINE, "An error occurred while 
instatiating Java implementation", e);
+                       logger.log(Level.WARNING, "Java implementation ''{0}'' 
for discovery service ''{1}'' could not be instantiated (internal error: 
''{2}'')",
+                                       new Object[] { className, 
props.getId(), e.getMessage() });
+                       return null;
+               }
+               if (service instanceof SyncDiscoveryService) {
+                       return new 
TransactionSyncDiscoveryServiceProxy((SyncDiscoveryService) service);
+               } else if (service instanceof AsyncDiscoveryService) {
+                       return new 
TransactionAsyncDiscoveryServiceProxy((AsyncDiscoveryService) service);
+               }
+               return service;
+       }
+
+       @Override
+       public String getDescription() {
+               return "The default Java runtime";
+       }
+
+       @Override
+       public void validate(DiscoveryServiceProperties props) throws 
ValidationException {
+               DiscoveryServiceJavaEndpoint javaEP;
+               try {
+                       javaEP = JSONUtils.convert(props.getEndpoint(), 
DiscoveryServiceJavaEndpoint.class);
+               } catch (JSONException e) {
+                       throw new ValidationException("Endpoint definition for 
Java service is not correct: " + Utils.getExceptionAsString(e));
+               }
+               new ImplementationValidator().validate("Service.endpoint", 
javaEP.getClassName());
+       }
+
+}


Reply via email to