Repository: asterixdb Updated Branches: refs/heads/master beb5dc746 -> 7152182a3
Enable Feed Changes to work with BAD project Extracts the ActiveListener Enables listeners to survive after job executions Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1524 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7152182a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7152182a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7152182a Branch: refs/heads/master Commit: 7152182a34b4845d0261f08d16f34695fc23acc4 Parents: beb5dc7 Author: Steven Glenn Jacobs <[email protected]> Authored: Thu Feb 23 21:11:21 2017 -0800 Committer: Steven Jacobs <[email protected]> Committed: Wed Mar 1 12:20:57 2017 -0800 ---------------------------------------------------------------------- .../active/ActiveJobNotificationHandler.java | 14 +++-- .../management/ActiveEntityEventsListener.java | 55 ++++++++++++++++++++ .../feed/management/FeedEventsListener.java | 25 ++------- 3 files changed, 68 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index e4491bd..d7998f8 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -55,14 +55,15 @@ public class ActiveJobNotificationHandler implements Runnable { if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind()); - LOGGER.log(Level.FINER, "Notifying the listener"); - listener.notify(event); if (event.getEventKind() == Kind.JOB_FINISHED) { LOGGER.log(Level.FINER, "Removing the job"); jobId2ActiveJobInfos.remove(event.getJobId()); - LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); - entityEventListeners.remove(listener.getEntityId()); } + if (listener != null) { + LOGGER.log(Level.FINER, "Notifying the listener"); + listener.notify(event); + } + } else { LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); } @@ -75,6 +76,11 @@ public class ActiveJobNotificationHandler implements Runnable { LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); } + public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException { + LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); + unregisterListener(listener); + } + public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java new file mode 100644 index 0000000..365c3ce --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.management; + +import java.util.List; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.hyracks.api.job.JobId; + +public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener { + + // members + protected EntityId entityId; + protected List<IDataset> datasets; + protected volatile ActivityState state; + protected JobId jobId; + + @Override + public EntityId getEntityId() { + return entityId; + } + + @Override + public ActivityState getState() { + return state; + } + + @Override + public boolean isEntityUsingDataset(IDataset dataset) { + return datasets.contains(dataset); + } + + public JobId getJobId() { + return jobId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7152182a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index 2a87cab..f49da3c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -25,9 +25,9 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveJobNotificationHandler; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.metadata.IDataset; @@ -36,20 +36,15 @@ import org.apache.asterix.external.feed.watch.NoOpSubscriber; import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; -public class FeedEventsListener implements IActiveEntityEventsListener { +public class FeedEventsListener extends ActiveEntityEventsListener { // constants private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName()); // members - private final EntityId entityId; - private final List<IDataset> datasets; private final String[] sources; private final List<IActiveEventSubscriber> subscribers; - private volatile ActivityState state; private int numRegistered; - private JobId jobId; public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) { this.entityId = entityId; @@ -111,6 +106,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener { IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); JobStatus status = hcc.getJobStatus(jobId); state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; + ActiveJobNotificationHandler.INSTANCE.removeListener(this); } private void start(ActiveEvent event) { @@ -119,16 +115,6 @@ public class FeedEventsListener implements IActiveEntityEventsListener { } @Override - public EntityId getEntityId() { - return entityId; - } - - @Override - public ActivityState getState() { - return state; - } - - @Override public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException { if (state != ActivityState.STARTED && state != ActivityState.STOPPED) { throw new HyracksDataException("Can only wait for STARTED or STOPPED state"); @@ -150,11 +136,6 @@ public class FeedEventsListener implements IActiveEntityEventsListener { return subscriber; } - @Override - public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); - } - public String[] getSources() { return sources; }
