Feed Connection Refactoring

1. The feed subscription network using FeedJoint is removed.
2. FeedConnection metadata dataset is added (pkeys: dataverseName,
   feedName, datasetName).
3. Replaced the old intake job + collect job combination with one single
   job using SplitOperator.
4. Now one feed can connect to multiple datasets.
5. The disconnect feed job is replaced by ActiveManagerMessage.
6. The new feed life cycle is:
   - Create feed
   - Connect feed to dataset0, dataset1, dataset2, etc.
   - Start feed
   - Stop feed
   - Disconnect feed
 7. New feedEventListner framework by Abdullah

Change-Id: Ic36267eb9a10df21734ce1cc1f38583e23c9e8f0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1259
Tested-by: Jenkins <[email protected]>
Reviewed-by: Till Westmann <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fff200ca
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fff200ca
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fff200ca

Branch: refs/heads/master
Commit: fff200ca8e932edff58862fcc95502d65ceebd01
Parents: 692b8a8
Author: Abdullah Alamoudi <[email protected]>
Authored: Sat Feb 18 20:32:14 2017 -0800
Committer: abdullah alamoudi <[email protected]>
Committed: Sat Feb 18 23:13:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/asterix/active/ActiveEvent.java  |  27 +-
 .../org/apache/asterix/active/ActiveJob.java    |  92 ---
 .../active/ActiveJobNotificationHandler.java    | 147 ++---
 .../asterix/active/ActiveLifecycleListener.java |  11 +-
 .../apache/asterix/active/ActivityState.java    |  26 +-
 .../active/IActiveEntityEventsListener.java     |  43 +-
 .../asterix/active/IActiveEventSubscriber.java  |  48 ++
 ...ceRandomPartitioningFeedComputationRule.java |   5 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   7 +-
 .../apache/asterix/app/external/FeedJoint.java  | 212 ------
 .../app/external/FeedWorkCollection.java        | 140 ----
 .../asterix/app/translator/QueryTranslator.java | 491 +++++---------
 .../apache/asterix/utils/FeedOperations.java    | 397 ++++++++++++
 .../metadata_dataset/metadata_dataset.1.adm     |  27 +-
 .../metadata_datatype/metadata_datatype.1.adm   | 131 ++--
 .../basic/metadata_index/metadata_index.1.adm   |  27 +-
 .../metadata_selfjoin/metadata_selfjoin.1.adm   |   1 +
 .../big_object_feed_20M.2.update.aql            |   2 +
 .../typed_adapter/typed_adapter.4.update.aql    |   2 +
 .../change-feed-with-meta-csv.2.update.aql      |   4 +-
 ...a-pk-in-meta-index-after-ingest.2.update.aql |   2 +
 ...ta-pk-in-meta-index-after-ingest.4.query.aql |   2 +-
 ...h-meta-pk-in-meta-index-in-meta.2.update.aql |   2 +
 ...index-with-missing-after-ingest.2.update.aql |   2 +
 ...-pk-in-meta-open-index-in-value.2.update.aql |   2 +
 ...in-meta-open-index-with-missing.2.update.aql |   2 +
 ...hange-feed-with-meta-pk-in-meta.2.update.aql |   2 +
 ...feed-with-meta-with-mixed-index.2.update.aql |   2 +
 .../feeds/change-feed/change-feed.2.update.aql  |   2 +
 .../feeds/connect-feed/connect-feed.0.ddl.aql   |  46 ++
 .../connect-feed/connect-feed.1.update.aql      |  29 +
 .../connect-feed/connect-feed.2.update.aql      |  20 +
 .../feeds/connect-feed/connect-feed.3.query.aql |  20 +
 .../feeds/connect-feed/connect-feed.4.ddl.aql   |  19 +
 .../feed-push-socket.2.update.aql               |   2 +
 .../feed-push-socket.3.server.aql               |   2 +-
 .../feed-push-socket.5.update.aql               |   1 +
 .../feed-with-external-function.1.ddl.aql       |   3 +-
 .../feed-with-external-function.3.update.aql    |   5 +-
 ...external-parser-with-open-index.4.update.aql |   2 +
 ...al-parser-with-two-open-indexes.4.update.aql |   2 +
 .../feed-with-external-parser.4.update.aql      |   2 +
 .../feed-with-filtered-dataset.2.update.aql     |   2 +
 .../feed-with-meta-pk-in-meta.2.update.aql      |   2 +
 .../feed-with-multiple-indexes.4.update.aql     |   2 +
 .../feeds/feeds_02/feeds_02.2.update.aql        |   6 +-
 .../queries/feeds/feeds_03/feeds_03.1.ddl.aql   |   7 +-
 .../feeds/feeds_03/feeds_03.2.update.aql        |   5 +
 .../queries/feeds/feeds_03/feeds_03.3.query.aql |   3 +-
 .../feeds/feeds_07/feeds_07.2.update.aql        |   6 +-
 .../feeds/feeds_08/feeds_08.2.update.aql        |   4 +-
 .../feeds/feeds_09/feeds_09.2.update.aql        |   6 +-
 .../feeds/feeds_10/feeds_10.2.update.aql        |   4 +-
 .../feeds/feeds_11/feeds_11.2.update.aql        |   8 +-
 .../feeds/feeds_12/feeds_12.2.update.aql        |   8 +-
 .../issue_230_feeds.2.update.aql                |   8 +-
 .../revised-tweet-parser.2.update.aql           |   2 +
 .../feeds/start-feed/start-feed.1.ddl.aql       |  59 ++
 .../feeds/start-feed/start-feed.2.update.aql    |  33 +
 .../feeds/start-feed/start-feed.3.server.aql    |  26 +
 .../feeds/start-feed/start-feed.4.sleep.aql     |  26 +
 .../feeds/start-feed/start-feed.5.update.aql    |  30 +
 .../feeds/start-feed/start-feed.6.query.aql     |  32 +
 .../feeds/start-feed/start-feed.7.server.aql    |  27 +
 .../feeds/start-feed/start-feed.8.ddl.aql       |  28 +
 .../start-started-feed.0.ddl.aql                |  40 ++
 .../start-started-feed.1.update.aql             |  21 +
 .../start-started-feed.2.sleep.aql              |  26 +
 .../start-started-feed.3.update.aql             |  21 +
 .../start-started-feed.4.ddl.aql                |  21 +
 .../stop-stopped-feed.0.ddl.aql                 |  41 ++
 .../stop-stopped-feed.1.update.aql              |  21 +
 .../stop-stopped-feed.2.sleep.aql               |  19 +
 .../stop-stopped-feed.3.update.aql              |  21 +
 .../stop-stopped-feed.4.sleep.aql               |  20 +
 .../stop-stopped-feed.5.update.aql              |  21 +
 .../twitter-feed/twitter-feed.2.update.aql      |   4 +-
 .../feeds/upsert-feed/upsert-feed.2.update.aql  |   2 +
 .../feeds/upsert-feed/upsert-feed.3.server.aql  |   2 +-
 .../feeds/upsert-feed/upsert-feed.5.update.aql  |   2 +-
 .../issue_251_dataset_hint_7.2.update.aql       |   6 +-
 ...taset_with_meta_primary_index-1.2.update.aql |   4 +-
 .../feeds/feeds_01/feeds_01.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_02/feeds_02.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_02/feeds_02.2.update.sqlpp      |   1 +
 .../feeds/feeds_03/feeds_03.1.ddl.sqlpp         |   4 +-
 .../feeds/feeds_03/feeds_03.2.update.sqlpp      |   4 +
 .../feeds/feeds_03/feeds_03.3.query.sqlpp       |   2 +-
 .../feeds/feeds_07/feeds_07.1.ddl.sqlpp         |   4 +-
 .../feeds/feeds_07/feeds_07.2.update.sqlpp      |   3 +-
 .../feeds/feeds_08/feeds_08.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_08/feeds_08.2.update.sqlpp      |   2 +
 .../feeds/feeds_09/feeds_09.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_09/feeds_09.2.update.sqlpp      |   1 +
 .../feeds/feeds_10/feeds_10.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_10/feeds_10.2.update.sqlpp      |   1 +
 .../feeds/feeds_11/feeds_11.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_11/feeds_11.2.update.sqlpp      |   1 +
 .../feeds/feeds_12/feeds_12.1.ddl.sqlpp         |   2 +-
 .../feeds/feeds_12/feeds_12.2.update.sqlpp      |   1 +
 .../issue_230_feeds/issue_230_feeds.1.ddl.sqlpp |   2 +-
 .../issue_230_feeds.2.update.sqlpp              |   1 +
 .../upsert-feed/upsert-feed.2.update.sqlpp      |   1 +
 .../upsert-feed/upsert-feed.5.update.sqlpp      |   1 +
 .../issue_251_dataset_hint_7.1.ddl.sqlpp        |   2 +-
 .../issue_251_dataset_hint_7.2.update.sqlpp     |   2 +
 .../change-feed-with-meta-pk-in-meta.4.adm      |   2 +-
 .../change-feed-with-meta-pk-in-meta.5.adm      |   2 +-
 .../feeds/connect-feed/connect-feed.1.adm       |   3 +
 .../results/feeds/feeds_01/feeds_01.1.adm       |   2 +-
 .../results/feeds/feeds_03/feeds_03.1.adm       |   2 +-
 .../results/feeds/start-feed/start-feed.1.adm   |   2 +
 .../results/types/any-object/any-object.2.adm   | 137 ++--
 .../src/test/resources/runtimets/testsuite.xml  |  22 +
 .../asterix/common/metadata/IDataset.java       |  23 +
 .../client/FileFeedSocketAdapterClient.java     |  13 +-
 .../external/feed/api/FeedOperationCounter.java |  59 --
 .../api/IActiveLifecycleEventSubscriber.java    |  40 --
 .../apache/asterix/external/feed/api/IFeed.java |   6 -
 .../asterix/external/feed/api/IFeedJoint.java   | 122 ----
 .../asterix/external/feed/api/IFeedWork.java    |  28 -
 .../feed/api/IFeedWorkEventListener.java        |  41 --
 .../external/feed/api/IFeedWorkManager.java     |  25 -
 .../external/feed/api/ISubscribableRuntime.java |  42 --
 .../ActiveLifecycleEventSubscriber.java         |  69 --
 .../feed/management/FeedConnectionId.java       |   2 +-
 .../feed/management/FeedConnectionRequest.java  |  59 +-
 .../feed/management/FeedEventsListener.java     | 646 +++----------------
 .../external/feed/management/FeedInfo.java      |  53 --
 .../external/feed/management/FeedJointKey.java  |  83 ---
 .../feed/management/FeedWorkManager.java        |  50 --
 .../external/feed/message/EndFeedMessage.java   | 100 ---
 .../external/feed/message/FeedMessage.java      |  42 --
 .../feed/runtime/CollectionRuntime.java         | 100 ---
 .../external/feed/runtime/IngestionRuntime.java |  70 +-
 .../feed/runtime/SubscribableRuntime.java       |  59 --
 .../external/feed/watch/FeedConnectJobInfo.java | 108 ----
 .../feed/watch/FeedEventSubscriber.java         |  64 ++
 .../external/feed/watch/FeedIntakeInfo.java     |  68 --
 .../external/feed/watch/NoOpSubscriber.java     |  54 ++
 .../twitter/TwitterRecordReaderFactory.java     |   9 +
 .../factory/SocketServerInputStreamFactory.java |   4 +-
 .../FeedCollectOperatorDescriptor.java          |  25 +-
 .../FeedCollectOperatorNodePushable.java        |  51 +-
 .../operators/FeedIntakeOperatorDescriptor.java |  46 +-
 .../FeedIntakeOperatorNodePushable.java         |  65 +-
 .../FeedMessageOperatorDescriptor.java          |  56 --
 .../FeedMessageOperatorNodePushable.java        | 173 -----
 .../apache/asterix/external/util/FeedUtils.java |   2 +
 .../external/feed/test/InputHandlerTest.java    |   3 +-
 .../typed_adapter/typed_adapter.2.update.aql    |   2 +
 .../feed_ingest/feed_ingest.1.ddl.aql           |   7 +-
 .../feed_ingest/feed_ingest.2.update.aql        |   9 +-
 .../feed_ingest/feed_ingest.4.query.aql         |   4 +-
 .../issue-1636/issue-1636.02.ddl.aql            |   2 +
 .../issue-1636/issue-1636.04.ddl.aql            |   1 +
 .../issue-1636/issue-1636.08.ddl.aql            |   2 +-
 .../dataset-with-meta-record.3.update.aql       |   3 +-
 .../dataset-with-meta-record.5.adm              |   2 +-
 .../aql/statement/SubscribeFeedStatement.java   |  51 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  37 +-
 .../asterix/lang/common/base/Statement.java     |  13 +-
 .../common/statement/ConnectFeedStatement.java  |  40 +-
 .../common/statement/CreateFeedStatement.java   |  40 +-
 .../statement/CreatePrimaryFeedStatement.java   |  59 --
 .../statement/CreateSecondaryFeedStatement.java |  62 --
 .../common/statement/StartFeedStatement.java    |  61 ++
 .../common/statement/StopFeedStatement.java     |  59 ++
 .../lang/common/visitor/FormatPrintVisitor.java |  48 +-
 .../base/AbstractQueryExpressionVisitor.java    |  14 +-
 .../lang/common/visitor/base/ILangVisitor.java  |  11 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |  70 +-
 .../apache/asterix/metadata/MetadataCache.java  | 131 ++--
 .../asterix/metadata/MetadataManager.java       |  51 +-
 .../apache/asterix/metadata/MetadataNode.java   |  66 +-
 .../metadata/MetadataTransactionContext.java    |  20 +-
 .../asterix/metadata/api/IMetadataManager.java  |  17 +-
 .../asterix/metadata/api/IMetadataNode.java     |  11 +
 .../metadata/bootstrap/MetadataBootstrap.java   |   2 +-
 .../bootstrap/MetadataPrimaryIndexes.java       |  11 +-
 .../metadata/bootstrap/MetadataRecordTypes.java | 111 +---
 .../metadata/declared/FeedDataSource.java       |  17 +-
 .../metadata/declared/MetadataManagerUtil.java  |  10 +
 .../metadata/declared/MetadataProvider.java     |   6 +
 .../asterix/metadata/entities/Dataset.java      |   7 +-
 .../apache/asterix/metadata/entities/Feed.java  |  31 +-
 .../metadata/entities/FeedConnection.java       | 117 ++++
 .../FeedConnectionTupleTranslator.java          | 179 +++++
 .../FeedTupleTranslator.java                    | 195 ++----
 .../metadata/feeds/FeedMetadataUtil.java        | 250 -------
 .../asterix/metadata/feeds/FeedOperations.java  | 182 ------
 .../metadata/utils/MetadataLockManager.java     |  29 +
 .../typed_adapter/typed_adapter.1.ddl.aql       |   4 +-
 .../typed_adapter/typed_adapter.2.update.aql    |   6 +-
 .../typed_adapter/typed_adapter.3.query.aql     |   4 +-
 .../feed_ingest/feed_ingest.1.ddl.aql           |  11 +-
 .../feed_ingest/feed_ingest.2.update.aql        |   7 +-
 .../feed_ingest/feed_ingest.3.query.aql         |   4 +-
 198 files changed, 3105 insertions(+), 4287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index c907a36..2669990 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -22,25 +22,27 @@ import org.apache.hyracks.api.job.JobId;
 
 public class ActiveEvent {
 
+    public enum Kind {
+        JOB_CREATED,
+        JOB_STARTED,
+        JOB_FINISHED,
+        PARTITION_EVENT,
+        EXTENSION_EVENT
+    }
+
     private final JobId jobId;
     private final EntityId entityId;
-    private final EventKind eventKind;
+    private final Kind eventKind;
     private final Object eventObject;
 
-    public enum EventKind {
-        JOB_START,
-        JOB_FINISH,
-        PARTITION_EVENT
-    }
-
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId 
entityId, Object eventObject) {
+    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId, Object 
eventObject) {
         this.jobId = jobId;
         this.entityId = entityId;
         this.eventKind = eventKind;
         this.eventObject = eventObject;
     }
 
-    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId 
entityId) {
+    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
         this(jobId, eventKind, entityId, null);
     }
 
@@ -52,11 +54,16 @@ public class ActiveEvent {
         return entityId;
     }
 
-    public EventKind getEventKind() {
+    public Kind getEventKind() {
         return eventKind;
     }
 
     public Object getEventObject() {
         return eventObject;
     }
+
+    @Override
+    public String toString() {
+        return "JobId:" + jobId + ", " + "EntityId:" + entityId + ", " + 
"Kind" + eventKind;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
deleted file mode 100644
index 1e3eca1..0000000
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.Serializable;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveJob implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
Logger.getLogger(ActiveJob.class.getName());
-    protected final EntityId entityId;
-    protected JobId jobId;
-    protected final Serializable jobObject;
-    protected ActivityState state;
-    protected JobSpecification spec;
-
-    public ActiveJob(EntityId entityId, JobId jobId, ActivityState state, 
Serializable jobInfo, JobSpecification spec) {
-        this.entityId = entityId;
-        this.state = state;
-        this.jobId = jobId;
-        this.jobObject = jobInfo;
-        this.spec = spec;
-    }
-
-    public ActiveJob(EntityId entityId, ActivityState state, Serializable 
jobInfo, JobSpecification spec) {
-        this.entityId = entityId;
-        this.state = state;
-        this.jobObject = jobInfo;
-        this.spec = spec;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(JobId jobId) {
-        this.jobId = jobId;
-    }
-
-    public ActivityState getState() {
-        return state;
-    }
-
-    public void setState(ActivityState state) {
-        this.state = state;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(this + " is in " + state + " state.");
-        }
-    }
-
-    public Object getJobObject() {
-        return jobObject;
-    }
-
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public void setSpec(JobSpecification spec) {
-        this.spec = spec;
-    }
-
-    @Override
-    public String toString() {
-        return jobId + " [" + jobObject + "]";
-    }
-
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 0d1d8ab..e4491bd 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveEvent.EventKind;
+import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -36,7 +36,7 @@ public class ActiveJobNotificationHandler implements Runnable 
{
     private static final boolean DEBUG = false;
     private final LinkedBlockingQueue<ActiveEvent> eventInbox;
     private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
-    private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
+    private final Map<JobId, EntityId> jobId2ActiveJobInfos;
 
     private ActiveJobNotificationHandler() {
         this.eventInbox = new LinkedBlockingQueue<>();
@@ -51,16 +51,20 @@ public class ActiveJobNotificationHandler implements 
Runnable {
         while (!Thread.interrupted()) {
             try {
                 ActiveEvent event = getEventInbox().take();
-                ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
-                EntityId entityId = jobInfo.getEntityId();
-                IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-                if (DEBUG) {
-                    LOGGER.log(Level.WARNING, "Next event is of type " + 
event.getEventKind());
-                    LOGGER.log(Level.WARNING, "Notifying the listener");
-                }
-                listener.notify(event);
-                if (event.getEventKind() == EventKind.JOB_FINISH) {
-                    removeJob(event.getJobId(), listener);
+                EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId());
+                if (entityId != null) {
+                    IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+                    LOGGER.log(Level.FINER, "Next event is of type " + 
event.getEventKind());
+                    LOGGER.log(Level.FINER, "Notifying the listener");
+                    listener.notify(event);
+                    if (event.getEventKind() == Kind.JOB_FINISHED) {
+                        LOGGER.log(Level.FINER, "Removing the job");
+                        jobId2ActiveJobInfos.remove(event.getJobId());
+                        LOGGER.log(Level.FINER, "Removing the listener since 
it is not active anymore");
+                        entityEventListeners.remove(listener.getEntityId());
+                    }
+                } else {
+                    LOGGER.log(Level.SEVERE, "Entity not found for received 
message for job " + event.getJobId());
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -71,29 +75,6 @@ public class ActiveJobNotificationHandler implements 
Runnable {
         LOGGER.log(Level.INFO, "Stopped " + 
ActiveJobNotificationHandler.class.getSimpleName());
     }
 
-    public void removeJob(JobId jobId, IActiveEntityEventsListener listener) {
-        removeFinishedJob(jobId, listener);
-        removeInactiveListener(listener);
-    }
-
-    private void removeFinishedJob(JobId jobId, IActiveEntityEventsListener 
listener) {
-        if (!listener.isEntityActive()) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Remove job" + jobId);
-            }
-            jobId2ActiveJobInfos.remove(jobId);
-        }
-    }
-
-    private void removeInactiveListener(IActiveEntityEventsListener listener) {
-        if (!listener.isEntityActive()) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Removing the listener since it is 
not active anymore");
-            }
-            entityEventListeners.remove(listener.getEntityId());
-        }
-    }
-
     public IActiveEntityEventsListener getActiveEntityListener(EntityId 
entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
@@ -103,61 +84,27 @@ public class ActiveJobNotificationHandler implements 
Runnable {
         return entityEventListeners.get(entityId);
     }
 
-    public synchronized ActiveJob[] getActiveJobs() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveJobs()  was called");
-            LOGGER.log(Level.WARNING, "Number of jobs found: " + 
jobId2ActiveJobInfos.size());
-        }
-        return jobId2ActiveJobInfos.values().toArray(new 
ActiveJob[jobId2ActiveJobInfos.size()]);
-    }
-
-    public boolean isActiveJob(JobId jobId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "isActiveJob(JobId jobId) called with 
jobId: " + jobId);
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
-        }
-        return jobId2ActiveJobInfos.get(jobId) != null;
-    }
-
     public EntityId getEntity(JobId jobId) {
-        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
-        return jobInfo == null ? null : jobInfo.getEntityId();
+        return jobId2ActiveJobInfos.get(jobId);
     }
 
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = "
-                            + jobId);
-        }
+        LOGGER.log(Level.FINER,
+                "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = " + jobId);
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
-        if (property == null || !(property instanceof ActiveJob)) {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Job was is not active. property 
found to be: " + property);
-            }
+        if (property == null || !(property instanceof EntityId)) {
+            LOGGER.log(Level.FINER, "Job was is not active. property found to 
be: " + property);
             return;
-        } else {
-            monitorJob(jobId, (ActiveJob) property);
         }
-        if (DEBUG) {
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
-        }
-        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
-        if (jobInfo != null) {
-            EntityId entityId = jobInfo.getEntityId();
-            IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            listener.notifyJobCreation(jobId, jobSpecification);
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "Listener was notified" + jobId);
-            }
-        } else {
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING,
-                        "Listener was not notified since it was not registered 
for the job " + jobId);
-            }
+        EntityId entityId = (EntityId) property;
+        monitorJob(jobId, entityId);
+        boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+        LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
+        IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
+        if (listener != null) {
+            listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
         }
+        LOGGER.log(Level.FINER, "Listener was notified" + jobId);
     }
 
     public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
@@ -174,9 +121,8 @@ public class ActiveJobNotificationHandler implements 
Runnable {
 
     public synchronized void registerListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
         if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "registerListener(IActiveEntityEventsListener listener) 
was called for the entity "
-                            + listener.getEntityId());
+            LOGGER.log(Level.FINER, 
"registerListener(IActiveEntityEventsListener listener) was called for the 
entity "
+                    + listener.getEntityId());
         }
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new HyracksDataException(
@@ -185,13 +131,23 @@ public class ActiveJobNotificationHandler implements 
Runnable {
         entityEventListeners.put(listener.getEntityId(), listener);
     }
 
-    public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
+    public synchronized void unregisterListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
+        LOGGER.log(Level.FINER, 
"unregisterListener(IActiveEntityEventsListener listener) was called for the 
entity "
+                + listener.getEntityId());
+        IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
+        if (registeredListener == null) {
+            throw new HyracksDataException(
+                    "Active Entity Listener " + listener.getEntityId() + " 
hasn't been registered");
+        }
+    }
+
+    public synchronized void monitorJob(JobId jobId, EntityId activeJob) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob 
activeJob) called with job id: " + jobId);
             boolean found = jobId2ActiveJobInfos.get(jobId) != null;
             LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
         }
-        if (entityEventListeners.containsKey(activeJob.getEntityId())) {
+        if (entityEventListeners.containsKey(activeJob)) {
             if (jobId2ActiveJobInfos.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + 
jobId);
                 return;
@@ -199,22 +155,9 @@ public class ActiveJobNotificationHandler implements 
Runnable {
             if (DEBUG) {
                 LOGGER.log(Level.WARNING, "monitoring started for job id: " + 
jobId);
             }
-            jobId2ActiveJobInfos.put(jobId, activeJob);
         } else {
-            LOGGER.severe("No listener was found for the entity: " + 
activeJob.getEntityId());
-        }
-    }
-
-    public synchronized void unregisterListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "unregisterListener(IActiveEntityEventsListener listener) 
was called for the entity "
-                            + listener.getEntityId());
-        }
-        IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
-        if (registeredListener == null) {
-            throw new HyracksDataException(
-                    "Active Entity Listener " + listener.getEntityId() + " 
hasn't been registered");
+            LOGGER.severe("No listener was found for the entity: " + 
activeJob);
         }
+        jobId2ActiveJobInfos.put(jobId, activeJob);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index fad30fa..6a10b0c 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -48,7 +49,7 @@ public class ActiveLifecycleListener implements 
IJobLifecycleListener {
     public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
         EntityId entityId = 
ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
         if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, 
ActiveEvent.EventKind.JOB_START, entityId));
+            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, 
entityId));
         }
     }
 
@@ -56,7 +57,7 @@ public class ActiveLifecycleListener implements 
IJobLifecycleListener {
     public synchronized void notifyJobFinish(JobId jobId) throws 
HyracksException {
         EntityId entityId = 
ActiveJobNotificationHandler.INSTANCE.getEntity(jobId);
         if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, 
ActiveEvent.EventKind.JOB_FINISH, entityId));
+            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, 
entityId));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
@@ -70,10 +71,8 @@ public class ActiveLifecycleListener implements 
IJobLifecycleListener {
     }
 
     public void receive(ActivePartitionMessage message) {
-        if 
(ActiveJobNotificationHandler.INSTANCE.isActiveJob(message.getJobId())) {
-            jobEventInbox.add(new ActiveEvent(message.getJobId(), 
ActiveEvent.EventKind.PARTITION_EVENT,
-                    message.getActiveRuntimeId().getEntityId(), message));
-        }
+        jobEventInbox.add(new ActiveEvent(message.getJobId(), 
Kind.PARTITION_EVENT,
+                message.getActiveRuntimeId().getEntityId(), message));
     }
 
     public void stop() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index 1301535..c8abb84 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -18,11 +18,25 @@
  */
 package org.apache.asterix.active;
 
-// TODO: Document the state machine and its transition.
 public enum ActivityState {
-    CREATED,
-    INACTIVE,
-    ACTIVE,
-    UNDER_RECOVERY,
-    ENDED
+    /**
+     * The starting state and a possible terminal state. Next state can only 
be {@code ActivityState.STARTING}
+     */
+    STOPPED,
+    /**
+     * A terminal state
+     */
+    FAILED,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.STARTED} or {@code ActivityState.FAILED}
+     */
+    STARTING,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.STOPPING} or {@code ActivityState.FAILED}
+     */
+    STARTED,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.STOPPED} or {@code ActivityState.FAILED}
+     */
+    STOPPING
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index d0fb5e8..ee8e776 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -18,19 +18,48 @@
  */
 package org.apache.asterix.active;
 
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IActiveEntityEventsListener {
 
-    public void notify(ActiveEvent message);
+    /**
+     * Notify the listener that an event related to the entity has taken place
+     * Examples of such events include
+     * 1. Job created
+     * 2. Job completed
+     * 3. Partition event
+     *
+     * @param event
+     *            the event that took place
+     */
+    void notify(ActiveEvent event);
 
-    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification);
+    /**
+     * @return the state of the entity
+     */
+    ActivityState getState();
 
-    public boolean isEntityActive();
+    /**
+     * get a subscriber that waits till state has been reached.
+     *
+     * @param state
+     *            the desired state
+     * @throws HyracksDataException
+     *             a failure happened while waiting for the state
+     */
+    IActiveEventSubscriber subscribe(ActivityState state) throws 
HyracksDataException;
 
-    public EntityId getEntityId();
+    /**
+     * @return the active entity id
+     */
+    EntityId getEntityId();
 
-    public boolean isEntityUsingDataset(String dataverseName, String 
datasetName);
+    /**
+     * dataset
+     *
+     * @return
+     */
+    boolean isEntityUsingDataset(IDataset dataset);
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
new file mode 100644
index 0000000..7be5737
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+/**
+ * An active event subscriber that subscribe to events related to active entity
+ */
+public interface IActiveEventSubscriber {
+
+    /**
+     * Notify the subscriber of a new event
+     * @param event
+     */
+    void notify(ActiveEvent event);
+
+    /**
+     * Checkcs whether the subscriber is done receiving events
+     * @return
+     */
+    boolean done();
+
+    /**
+     * Wait until the terminal event has been received
+     * @throws InterruptedException
+     */
+    void sync() throws InterruptedException;
+
+    /**
+     * Stop watching events
+     */
+    void unsubscribe();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 6ff8a56..1a0ecd9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -21,6 +21,7 @@ package org.apache.asterix.optimizer.rules;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.FeedDataSource;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -59,8 +60,8 @@ public class IntroduceRandomPartitioningFeedComputationRule 
implements IAlgebrai
         }
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        Feed feed = feedDataSource.getFeed();
-        if (feed.getAppliedFunction() == null) {
+        FeedConnection feedConnection = feedDataSource.getFeedConnection();
+        if (feedConnection.getAppliedFunctions() == null) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 74e6ec5..46b421b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -33,6 +33,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -214,6 +215,8 @@ public class UnnestToDataScanRule implements 
IAlgebraicRewriteRule {
         Dataset dataset = 
metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
         ARecordType feedOutputType = (ARecordType) 
metadataProvider.findType(aqlId.getDataverseName(), outputType);
         Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), 
sourceFeedName);
+        FeedConnection feedConnection = 
metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName,
+                targetDataset);
         ARecordType metaType = null;
         // Does dataset have meta?
         if (dataset.hasMetaPart()) {
@@ -260,8 +263,8 @@ public class UnnestToDataScanRule implements 
IAlgebraicRewriteRule {
         }
         FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, 
targetDataset, feedOutputType, metaType,
                 pkTypes, partitioningKeys, 
keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
-                sourceFeed.getFeedType(), 
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain());
+                 FeedRuntimeType.valueOf(subscriptionLocation), 
locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
         
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
         return feedDataSource;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
deleted file mode 100644
index 0614d14..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJoint.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.app.external.FeedWorkCollection.SubscribeFeedWork;
-import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import org.apache.asterix.external.feed.management.FeedEventsListener;
-import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedWorkManager;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-
-public class FeedJoint implements IFeedJoint {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = 
Logger.getLogger(FeedJoint.class.getName());
-
-    /** A unique key associated with the feed point **/
-    private final FeedJointKey key;
-
-    /** The state associated with the FeedJoint **/
-    private State state;
-
-    /** A list of subscribers that receive data from this FeedJoint **/
-    private final List<FeedConnectionId> receivers;
-
-    /** The feedId on which the feedPoint resides **/
-    private final EntityId ownerFeedId;
-
-    /** A list of feed subscription requests submitted for subscribing to the 
FeedPoint's data **/
-    private final List<FeedConnectionRequest> connectionRequests;
-
-    private final FeedRuntimeType connectionLocation;
-
-    private final FeedJointType type;
-
-    private FeedConnectionId provider;
-
-    public FeedJoint(FeedJointKey key, EntityId ownerFeedId, FeedRuntimeType 
subscriptionLocation, FeedJointType type,
-            FeedConnectionId provider) {
-        this.key = key;
-        this.ownerFeedId = ownerFeedId;
-        this.type = type;
-        this.receivers = new ArrayList<FeedConnectionId>();
-        this.state = State.CREATED;
-        this.connectionLocation = subscriptionLocation;
-        this.connectionRequests = new ArrayList<FeedConnectionRequest>();
-        this.provider = provider;
-    }
-
-    @Override
-    public int hashCode() {
-        return key.hashCode();
-    }
-
-    @Override
-    public void addReceiver(FeedConnectionId connectionId) {
-        receivers.add(connectionId);
-    }
-
-    @Override
-    public void removeReceiver(FeedConnectionId connectionId) {
-        receivers.remove(connectionId);
-    }
-
-    @Override
-    public synchronized void addConnectionRequest(FeedConnectionRequest 
request) {
-        connectionRequests.add(request);
-        if (state.equals(State.ACTIVE)) {
-            handlePendingConnectionRequest();
-        }
-    }
-
-    @Override
-    public synchronized void setState(State state) {
-        if (this.state.equals(state)) {
-            return;
-        }
-        this.state = state;
-        if (this.state.equals(State.ACTIVE)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
-            }
-            handlePendingConnectionRequest();
-        }
-    }
-
-    private void handlePendingConnectionRequest() {
-        for (FeedConnectionRequest connectionRequest : connectionRequests) {
-            FeedConnectionId connectionId =
-                    new 
FeedConnectionId(connectionRequest.getReceivingFeedId(), 
connectionRequest.getTargetDataset());
-            try {
-                FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                        .getActiveEntityListener(connectionId.getFeedId());
-                SubscribeFeedWork work = new SubscribeFeedWork(
-                        listener.getConnectionLocations(this, 
connectionRequest).toArray(new String[] {}),
-                        connectionRequest);
-                FeedWorkManager.INSTANCE.submitWork(work, new 
SubscribeFeedWork.FeedSubscribeWorkEventListener());
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Submitted feed connection request " + 
connectionRequest + " at feed joint " + this);
-                }
-                addReceiver(connectionId);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unsuccessful attempt at submitting 
connection request " + connectionRequest
-                            + " at feed joint " + this + ". Message " + 
e.getMessage());
-                }
-                e.printStackTrace();
-            }
-        }
-        connectionRequests.clear();
-    }
-
-    @Override
-    public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
-        for (FeedConnectionId cid : receivers) {
-            if (cid.equals(connectionId)) {
-                return cid;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return key.toString() + " [" + connectionLocation + "]" + "[" + state 
+ "]";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) {
-            return false;
-        }
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof FeedJoint)) {
-            return false;
-        }
-        return ((FeedJoint) o).getFeedJointKey().equals(this.key);
-    }
-
-    @Override
-    public EntityId getOwnerFeedId() {
-        return ownerFeedId;
-    }
-
-    @Override
-    public List<FeedConnectionRequest> getConnectionRequests() {
-        return connectionRequests;
-    }
-
-    @Override
-    public FeedRuntimeType getConnectionLocation() {
-        return connectionLocation;
-    }
-
-    @Override
-    public FeedJointType getType() {
-        return type;
-    }
-
-    @Override
-    public FeedConnectionId getProvider() {
-        return provider;
-    }
-
-    @Override
-    public List<FeedConnectionId> getReceivers() {
-        return receivers;
-    }
-
-    public FeedJointKey getKey() {
-        return key;
-    }
-
-    @Override
-    public synchronized State getState() {
-        return state;
-    }
-
-    @Override
-    public FeedJointKey getFeedJointKey() {
-        return key;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
deleted file mode 100644
index ec7c239..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.api.IFeedWork;
-import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
-import org.apache.asterix.external.feed.management.FeedConnectionRequest;
-import 
org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
-import org.apache.asterix.translator.SessionConfig.OutputFormat;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-/**
- * A collection of feed management related task, each represented as an 
implementation of {@code IFeedWork}.
- */
-public class FeedWorkCollection {
-
-    private static Logger LOGGER = 
Logger.getLogger(FeedWorkCollection.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
-    private static final IStorageComponentProvider storageComponentProvider = 
new StorageComponentProvider();
-
-    /**
-     * The task of subscribing to a feed to obtain data.
-     */
-    public static class SubscribeFeedWork implements IFeedWork {
-
-        private final Runnable runnable;
-
-        private final FeedConnectionRequest request;
-
-        @Override
-        public Runnable getRunnable() {
-            return runnable;
-        }
-
-        public SubscribeFeedWork(String[] locations, FeedConnectionRequest 
request) {
-            this.runnable = new SubscribeFeedWorkRunnable(locations, request);
-            this.request = request;
-        }
-
-        private static class SubscribeFeedWorkRunnable implements Runnable {
-
-            private static final DefaultStatementExecutorFactory qtFactory = 
new DefaultStatementExecutorFactory();
-            private final FeedConnectionRequest request;
-            private final String[] locations;
-
-            public SubscribeFeedWorkRunnable(String[] locations, 
FeedConnectionRequest request) {
-                this.request = request;
-                this.locations = locations;
-            }
-
-            @Override
-            public void run() {
-                try {
-                    //TODO(amoudi): route PrintWriter to log file
-                    PrintWriter writer = new PrintWriter(System.err, true);
-                    SessionConfig pc = new SessionConfig(writer, 
OutputFormat.ADM);
-                    DataverseDecl dataverseDecl = new DataverseDecl(
-                            new 
Identifier(request.getReceivingFeedId().getDataverse()));
-                    SubscribeFeedStatement subscribeStmt = new 
SubscribeFeedStatement(locations, request);
-                    List<Statement> statements = new ArrayList<>();
-                    statements.add(dataverseDecl);
-                    statements.add(subscribeStmt);
-                    IStatementExecutor translator = 
qtFactory.create(statements, pc, compilationProvider,
-                            storageComponentProvider);
-                    
translator.compileAndExecute(AppContextInfo.INSTANCE.getHcc(), null,
-                            QueryTranslator.ResultDelivery.IMMEDIATE);
-                    if (LOGGER.isEnabledFor(Level.INFO)) {
-                        LOGGER.info("Submitted connection requests for 
execution: " + request);
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isEnabledFor(Level.FATAL)) {
-                        LOGGER.fatal("Exception in executing " + request, e);
-                    }
-                }
-            }
-        }
-
-        public static class FeedSubscribeWorkEventListener implements 
IFeedWorkEventListener {
-
-            @Override
-            public void workFailed(IFeedWork work, Exception e) {
-                if (LOGGER.isEnabledFor(Level.WARN)) {
-                    LOGGER.warn(" Feed subscription request " + 
((SubscribeFeedWork) work).request
-                            + " failed with exception " + e);
-                }
-            }
-
-            @Override
-            public void workCompleted(IFeedWork work) {
-                ((SubscribeFeedWork) 
work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
-                if (LOGGER.isEnabledFor(Level.INFO)) {
-                    LOGGER.info(" Feed subscription request " + 
((SubscribeFeedWork) work).request + " completed ");
-                }
-            }
-        }
-
-        public FeedConnectionRequest getRequest() {
-            return request;
-        }
-
-        @Override
-        public String toString() {
-            return "SubscribeFeedWork for [" + request + "]";
-        }
-    }
-}

Reply via email to