http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index fa48b58..2a87cab 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -19,635 +19,143 @@ package org.apache.asterix.external.feed.management; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.ActiveEvent; -import org.apache.asterix.active.ActiveJob; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.feed.api.FeedOperationCounter; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.api.IFeedJoint.State; -import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.feed.watch.FeedIntakeInfo; -import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; -import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; -import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; -import org.apache.asterix.external.util.FeedUtils.JobType; +import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.external.feed.watch.FeedEventSubscriber; +import org.apache.asterix.external.feed.watch.NoOpSubscriber; import org.apache.asterix.runtime.utils.AppContextInfo; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataflow.IConnectorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobInfo; -import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor; -import org.apache.log4j.Logger; public class FeedEventsListener implements IActiveEntityEventsListener { - private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class); - private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline; - private final List<IActiveLifecycleEventSubscriber> subscribers; - private final Map<Long, ActiveJob> jobs; - private final Map<Long, ActiveJob> intakeJobs; - private final Map<EntityId, FeedIntakeInfo> entity2Intake; - private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos; - private EntityId entityId; - private IFeedJoint sourceFeedJoint; - - public FeedEventsListener(EntityId entityId) { + // constants + private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName()); + // members + private final EntityId entityId; + private final List<IDataset> datasets; + private final String[] sources; + private final List<IActiveEventSubscriber> subscribers; + private volatile ActivityState state; + private int numRegistered; + private JobId jobId; + + public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) { this.entityId = entityId; + this.datasets = datasets; + this.sources = sources; subscribers = new ArrayList<>(); - jobs = new HashMap<>(); - feedPipeline = new HashMap<>(); - entity2Intake = new HashMap<>(); - connectJobInfos = new HashMap<>(); - intakeJobs = new HashMap<>(); + state = ActivityState.STOPPED; } @Override - public void notify(ActiveEvent event) { + public synchronized void notify(ActiveEvent event) { try { switch (event.getEventKind()) { - case JOB_START: - handleJobStartEvent(event); + case JOB_STARTED: + start(event); break; - case JOB_FINISH: - handleJobFinishEvent(event); + case JOB_FINISHED: + finish(); break; case PARTITION_EVENT: - handlePartitionStart(event); + partition((ActivePartitionMessage) event.getEventObject()); break; default: - LOGGER.warn("Unknown Feed Event" + event); + LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event); break; } + notifySubscribers(event); } catch (Exception e) { - LOGGER.error("Unhandled Exception", e); - } - } - - private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception { - ActiveJob jobInfo = jobs.get(message.getJobId().getId()); - JobType jobType = (JobType) jobInfo.getJobObject(); - switch (jobType) { - case INTAKE: - handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo); - break; - case FEED_CONNECT: - handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo); - break; - default: - } - } - - private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception { - ActiveJob jobInfo = jobs.get(message.getJobId().getId()); - JobType jobType = (JobType) jobInfo.getJobObject(); - switch (jobType) { - case FEED_CONNECT: - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Collect Job finished for " + jobInfo); - } - handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo); - break; - case INTAKE: - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId()); - } - handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message); - break; - default: - break; - } - } - - private synchronized void handlePartitionStart(ActiveEvent message) { - ActiveJob jobInfo = jobs.get(message.getJobId().getId()); - JobType jobType = (JobType) jobInfo.getJobObject(); - switch (jobType) { - case FEED_CONNECT: - ((FeedConnectJobInfo) jobInfo).partitionStart(); - if (((FeedConnectJobInfo) jobInfo).collectionStarted()) { - notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED); - } - break; - case INTAKE: - handleIntakePartitionStarts(message, jobInfo); - break; - default: - break; - - } - } - - private void handleIntakePartitionStarts(ActiveEvent message, ActiveJob jobInfo) { - if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) { - ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE); - jobInfo.setState(ActivityState.ACTIVE); - notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED); + LOGGER.log(Level.SEVERE, "Unhandled Exception", e); } } - public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) throws HyracksDataException { - Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline = feedPipeline - .get(feedJoint.getOwnerFeedId()); - if (feedJointsOnPipeline == null) { - feedJointsOnPipeline = new Pair<>(new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>()); - feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline); - feedJointsOnPipeline.second.add(feedJoint); - } else { - if (!feedJointsOnPipeline.second.contains(feedJoint)) { - feedJointsOnPipeline.second.add(feedJoint); + private synchronized void notifySubscribers(ActiveEvent event) { + notifyAll(); + Iterator<IActiveEventSubscriber> it = subscribers.iterator(); + while (it.hasNext()) { + IActiveEventSubscriber subscriber = it.next(); + if (subscriber.done()) { + it.remove(); } else { - throw new RuntimeDataException( - ErrorCode.FEED_MANAGEMENT_FEED_EVENT_LISTENER_FEED_JOINT_REGISTERED, feedJoint); - } - } - } - - public synchronized void deregisterFeedIntakeJob(JobId jobId) { - FeedIntakeInfo info = (FeedIntakeInfo) intakeJobs.remove(jobId.getId()); - jobs.remove(jobId.getId()); - entity2Intake.remove(info.getFeedId()); - List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second; - joints.remove(info.getIntakeFeedJoint()); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Deregistered feed intake job [" + jobId + "]"); - } - } - - private static synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception { - List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<>(); - Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap(); - for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) { - IOperatorDescriptor opDesc = entry.getValue(); - if (opDesc instanceof FeedIntakeOperatorDescriptor) { - intakeOperatorIds.add(opDesc.getOperatorId()); - } - } - - IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); - JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId()); - List<String> intakeLocations = new ArrayList<>(); - for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) { - Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId); - int nOperatorInstances = operatorLocations.size(); - for (int i = 0; i < nOperatorInstances; i++) { - intakeLocations.add(operatorLocations.get(i)); - } - } - // intakeLocations is an ordered list; - // element at position i corresponds to location of i'th instance of operator - intakeJobInfo.setIntakeLocation(intakeLocations); - } - - public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) { - FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId); - if (cInfo != null) { - return cInfo.getSourceFeedJoint(); - } - return null; - } - - public synchronized void registerFeedIntakeJob(EntityId feedId, JobId jobId, JobSpecification jobSpec) - throws HyracksDataException { - if (entity2Intake.get(feedId) != null) { - throw new RuntimeDataException( - ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_ALREADY_HAVE_INTAKE_JOB); - } - if (intakeJobs.get(jobId.getId()) != null) { - throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_INTAKE_JOB_REGISTERED); - } - if (jobs.get(jobId.getId()) != null) { - throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED); - } - - Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId); - sourceFeedJoint = null; - for (IFeedJoint joint : pair.second) { - if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) { - sourceFeedJoint = joint; - break; - } - } - - if (sourceFeedJoint != null) { - FeedIntakeInfo intakeJobInfo = - new FeedIntakeInfo(jobId, ActivityState.CREATED, feedId, sourceFeedJoint, jobSpec); - pair.first.setFeedJobInfo(intakeJobInfo); - entity2Intake.put(feedId, intakeJobInfo); - jobs.put(jobId.getId(), intakeJobInfo); - intakeJobs.put(jobId.getId(), intakeJobInfo); - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId); - } - } else { - throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL, jobId, - feedId); - } - } - - public synchronized void registerFeedCollectionJob(EntityId sourceFeedId, FeedConnectionId connectionId, - JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) throws HyracksDataException { - if (jobs.get(jobId.getId()) != null) { - throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED); - } - if (connectJobInfos.containsKey(jobId.getId())) { - throw new RuntimeDataException(ErrorCode.FEED_MANAGEMENT_FEED_EVENTS_LISTENER_FEED_JOB_REGISTERED); - } - - List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second; - FeedConnectionId cid = null; - IFeedJoint collectionSourceFeedJoint = null; - for (IFeedJoint joint : feedJoints) { - cid = joint.getReceiver(connectionId); - if (cid != null) { - collectionSourceFeedJoint = joint; - break; - } - } - - if (cid != null) { - FeedConnectJobInfo cInfo = new FeedConnectJobInfo(sourceFeedId, jobId, ActivityState.CREATED, connectionId, - collectionSourceFeedJoint, null, jobSpec, feedPolicy); - jobs.put(jobId.getId(), cInfo); - connectJobInfos.put(connectionId, cInfo); - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId); - } - } else { - LOGGER.warn( - "Could not register feed collection job [" + jobId + "]" + " for feed connection " + connectionId); - } - } - - @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) { - FeedConnectionId feedConnectionId = null; - Map<String, String> feedPolicy = null; - try { - for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) { - if (opDesc instanceof FeedCollectOperatorDescriptor) { - feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId(); - feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties(); - registerFeedCollectionJob(((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(), - feedConnectionId, jobId, spec, feedPolicy); - return; - } else if (opDesc instanceof FeedIntakeOperatorDescriptor) { - registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(), jobId, spec); - return; + subscriber.notify(event); + if (subscriber.done()) { + it.remove(); } } - } catch (Exception e) { - LOGGER.error(e); - } - } - - public synchronized List<String> getConnectionLocations(IFeedJoint feedJoint, final FeedConnectionRequest request) - throws Exception { - List<String> locations = null; - switch (feedJoint.getType()) { - case COMPUTE: - FeedConnectionId connectionId = feedJoint.getProvider(); - FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId); - locations = cInfo.getComputeLocations(); - break; - case INTAKE: - FeedIntakeInfo intakeInfo = entity2Intake.get(feedJoint.getOwnerFeedId()); - locations = intakeInfo.getIntakeLocation(); - break; - default: - break; } - return locations; } - private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) { - if (subscribers != null && !subscribers.isEmpty()) { - for (IActiveLifecycleEventSubscriber subscriber : subscribers) { - subscriber.handleEvent(event); + private void partition(ActivePartitionMessage message) { + if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + numRegistered++; + if (numRegistered == getSources().length) { + state = ActivityState.STARTED; } } } - private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message) - throws Exception { + private void finish() throws Exception { IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); - JobInfo info = hcc.getJobInfo(message.getJobId()); - JobStatus status = info.getStatus(); - EntityId feedId = intakeInfo.getFeedId(); - Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId); - if (status.equals(JobStatus.FAILURE)) { - pair.first.setFailedIngestion(true); - } - // remove feed joints - deregisterFeedIntakeJob(message.getJobId()); - // notify event listeners - feedPipeline.remove(feedId); - entity2Intake.remove(feedId); - notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE - : ActiveLifecycleEvent.FEED_INTAKE_ENDED); - } - - private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception { - FeedConnectionId connectionId = cInfo.getConnectionId(); - - IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); - JobInfo info = hcc.getJobInfo(cInfo.getJobId()); - JobStatus status = info.getStatus(); - boolean failure = status != null && status.equals(JobStatus.FAILURE); - FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy()); - boolean retainSubsription = - cInfo.getState().equals(ActivityState.UNDER_RECOVERY) || (failure && fpa.continueOnHardwareFailure()); - - if (!retainSubsription) { - IFeedJoint feedJoint = cInfo.getSourceFeedJoint(); - feedJoint.removeReceiver(connectionId); - if (LOGGER.isInfoEnabled()) { - LOGGER.info( - "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription"); - } - } - - connectJobInfos.remove(connectionId); - jobs.remove(cInfo.getJobId().getId()); - // notify event listeners - ActiveLifecycleEvent event = - failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED; - notifyFeedEventSubscribers(event); - } - - public List<String> getFeedStorageLocations(FeedConnectionId connectionId) { - return connectJobInfos.get(connectionId).getStorageLocations(); - } - - public List<String> getFeedCollectLocations(FeedConnectionId connectionId) { - return connectJobInfos.get(connectionId).getCollectLocations(); + JobStatus status = hcc.getJobStatus(jobId); + state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; } - public List<String> getFeedIntakeLocations(EntityId feedId) { - return entity2Intake.get(feedId).getIntakeLocation(); + private void start(ActiveEvent event) { + this.jobId = event.getJobId(); + state = ActivityState.STARTING; } - public JobId getFeedCollectJobId(FeedConnectionId connectionId) { - return connectJobInfos.get(connectionId).getJobId(); - } - - public boolean isFeedPointAvailable(FeedJointKey feedJointKey) { - List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId()) - ? feedPipeline.get(feedJointKey.getFeedId()).second : null; - if (joints != null && !joints.isEmpty()) { - for (IFeedJoint joint : joints) { - if (joint.getFeedJointKey().equals(feedJointKey)) { - return true; - } - } - } - return false; + @Override + public EntityId getEntityId() { + return entityId; } - public Collection<IFeedJoint> getFeedIntakeJoints() { - List<IFeedJoint> intakeFeedPoints = new ArrayList<>(); - for (FeedIntakeInfo info : entity2Intake.values()) { - intakeFeedPoints.add(info.getIntakeFeedJoint()); - } - return intakeFeedPoints; + @Override + public ActivityState getState() { + return state; } - public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) { - List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId()) - ? feedPipeline.get(feedPointKey.getFeedId()).second : null; - if (joints != null && !joints.isEmpty()) { - for (IFeedJoint joint : joints) { - if (joint.getFeedJointKey().equals(feedPointKey)) { - return joint; - } - } + @Override + public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException { + if (state != ActivityState.STARTED && state != ActivityState.STOPPED) { + throw new HyracksDataException("Can only wait for STARTED or STOPPED state"); } - return null; - } - - public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) { - IFeedJoint feedJoint = getFeedJoint(feedJointKey); - if (feedJoint != null) { - return feedJoint; - } else { - String jointKeyString = feedJointKey.getStringRep(); - List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId()) - ? feedPipeline.get(feedJointKey.getFeedId()).second : null; - IFeedJoint candidateJoint = null; - if (jointsOnPipeline != null) { - for (IFeedJoint joint : jointsOnPipeline) { - if (jointKeyString.contains(joint.getFeedJointKey().getStringRep()) && (candidateJoint == null - || /*found feed point is a super set of the earlier find*/joint.getFeedJointKey() - .getStringRep().contains(candidateJoint.getFeedJointKey().getStringRep()))) { - candidateJoint = joint; - } - } + synchronized (this) { + if (this.state == ActivityState.FAILED) { + throw new HyracksDataException("Feed has failed"); + } else if (this.state == state) { + return NoOpSubscriber.INSTANCE; } - return candidateJoint; + return doSubscribe(state); } } - public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) { - return connectJobInfos.get(connectionId).getSpec(); - } - - public IFeedJoint getFeedPoint(EntityId sourceFeedId, IFeedJoint.FeedJointType type) { - List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second; - for (IFeedJoint joint : joints) { - if (joint.getType().equals(type)) { - return joint; - } - } - return null; - } - - private void setLocations(FeedConnectJobInfo cInfo) { - JobSpecification jobSpec = cInfo.getSpec(); - - List<OperatorDescriptorId> collectOperatorIds = new ArrayList<>(); - List<OperatorDescriptorId> computeOperatorIds = new ArrayList<>(); - List<OperatorDescriptorId> storageOperatorIds = new ArrayList<>(); - - Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap(); - for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) { - IOperatorDescriptor opDesc = entry.getValue(); - IOperatorDescriptor actualOp; - if (opDesc instanceof FeedMetaOperatorDescriptor) { - actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator(); - } else { - actualOp = opDesc; - } - - if (actualOp instanceof AlgebricksMetaOperatorDescriptor) { - AlgebricksMetaOperatorDescriptor op = (AlgebricksMetaOperatorDescriptor) actualOp; - IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories(); - boolean computeOp = false; - for (IPushRuntimeFactory rf : runtimeFactories) { - if (rf instanceof AssignRuntimeFactory) { - IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0); - IOperatorDescriptor sourceOp = - jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId()).getLeft().getLeft(); - if (sourceOp instanceof FeedCollectOperatorDescriptor) { - computeOp = true; - break; - } - } - } - if (computeOp) { - computeOperatorIds.add(entry.getKey()); - } - } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) { - storageOperatorIds.add(entry.getKey()); - } else if (actualOp instanceof FeedCollectOperatorDescriptor) { - collectOperatorIds.add(entry.getKey()); - } - } - - try { - IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); - JobInfo info = hcc.getJobInfo(cInfo.getJobId()); - List<String> collectLocations = new ArrayList<>(); - for (OperatorDescriptorId collectOpId : collectOperatorIds) { - Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId); - int nOperatorInstances = operatorLocations.size(); - for (int i = 0; i < nOperatorInstances; i++) { - collectLocations.add(operatorLocations.get(i)); - } - } - - List<String> computeLocations = new ArrayList<>(); - for (OperatorDescriptorId computeOpId : computeOperatorIds) { - Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId); - if (operatorLocations != null) { - int nOperatorInstances = operatorLocations.size(); - for (int i = 0; i < nOperatorInstances; i++) { - computeLocations.add(operatorLocations.get(i)); - } - } else { - computeLocations.clear(); - computeLocations.addAll(collectLocations); - } - } - - List<String> storageLocations = new ArrayList<>(); - for (OperatorDescriptorId storageOpId : storageOperatorIds) { - Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId); - if (operatorLocations == null) { - continue; - } - int nOperatorInstances = operatorLocations.size(); - for (int i = 0; i < nOperatorInstances; i++) { - storageLocations.add(operatorLocations.get(i)); - } - } - cInfo.setCollectLocations(collectLocations); - cInfo.setComputeLocations(computeLocations); - cInfo.setStorageLocations(storageLocations); - - } catch (Exception e) { - LOGGER.error("Error while setting feed active locations", e); - } - - } - - public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) { + // Called within synchronized block + private FeedEventSubscriber doSubscribe(ActivityState state) { + FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state); subscribers.add(subscriber); - } - - public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) { - subscribers.remove(subscriber); - } - - public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId, - IActiveLifecycleEventSubscriber eventSubscriber) { - boolean active = false; - FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId); - if (cInfo != null) { - active = cInfo.getState().equals(ActivityState.ACTIVE); - } - if (active) { - registerFeedEventSubscriber(eventSubscriber); - } - return active; - } - - public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) { - return connectJobInfos.get(connectionId); - } - - private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws ACIDException { - // set locations of feed sub-operations (intake, compute, store) - setLocations(cInfo); - Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId()); - // activate joints - List<IFeedJoint> joints = pair.second; - for (IFeedJoint joint : joints) { - if (joint.getProvider().equals(cInfo.getConnectionId())) { - joint.setState(State.ACTIVE); - if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) { - cInfo.setComputeFeedJoint(joint); - } - } - } - cInfo.setState(ActivityState.ACTIVE); - } - - private synchronized boolean isConnectedToDataset(String datasetName) { - for (FeedConnectionId connection : connectJobInfos.keySet()) { - if (connection.getDatasetName().equals(datasetName)) { - return true; - } - } - return false; - } - - public FeedConnectionId[] getConnections() { - return connectJobInfos.keySet().toArray(new FeedConnectionId[connectJobInfos.size()]); - } - - public boolean isFeedJointAvailable(FeedJointKey feedJointKey) { - return isFeedPointAvailable(feedJointKey); + return subscriber; } @Override - public boolean isEntityActive() { - return !jobs.isEmpty(); + public boolean isEntityUsingDataset(IDataset dataset) { + return datasets.contains(dataset); } - @Override - public EntityId getEntityId() { - return entityId; - } - - public IFeedJoint getSourceFeedJoint() { - return sourceFeedJoint; - } - - @Override - public boolean isEntityUsingDataset(String dataverseName, String datasetName) { - return isConnectedToDataset(datasetName); + public String[] getSources() { + return sources; } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java deleted file mode 100644 index 93f81d9..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedInfo.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.management; - -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobInfo; -import org.apache.hyracks.api.job.JobSpecification; - -public class FeedInfo { - public JobSpecification jobSpec; - public JobInfo jobInfo; - public JobId jobId; - public FeedInfoType infoType; - public State state; - - public enum State { - ACTIVE, - INACTIVE - } - - public enum FeedInfoType { - INTAKE, - COLLECT - } - - public FeedInfo(JobSpecification jobSpec, JobId jobId, FeedInfoType infoType) { - this.jobSpec = jobSpec; - this.jobId = jobId; - this.infoType = infoType; - this.state = State.INACTIVE; - } - - @Override - public String toString() { - return " job id " + jobId; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java deleted file mode 100644 index 2905bb2..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.management; - -import java.io.Serializable; -import java.util.List; - -import org.apache.asterix.active.EntityId; -import org.apache.commons.lang3.StringUtils; - -/** - * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located - * along a feed ingestion pipeline at a point where the tuples moving as part of data flow - * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be - * routed to multiple paths. - */ -public class FeedJointKey implements Serializable { - - private static final long serialVersionUID = 1L; - private final EntityId primaryFeedId; - private final List<String> appliedFunctions; - private final String stringRep; - - public FeedJointKey(EntityId feedId, List<String> appliedFunctions) { - this.primaryFeedId = feedId; - this.appliedFunctions = appliedFunctions; - StringBuilder builder = new StringBuilder(); - builder.append(feedId); - builder.append(":"); - builder.append(StringUtils.join(appliedFunctions, ':')); - stringRep = builder.toString(); - } - - public EntityId getFeedId() { - return primaryFeedId; - } - - public List<String> getAppliedFunctions() { - return appliedFunctions; - } - - public String getStringRep() { - return stringRep; - } - - @Override - public final String toString() { - return stringRep; - } - - @Override - public int hashCode() { - return stringRep.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || !(o instanceof FeedJointKey)) { - return false; - } - return stringRep.equals(((FeedJointKey) o).stringRep); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java deleted file mode 100644 index bab4376..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedWorkManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.management; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.asterix.external.feed.api.IFeedWork; -import org.apache.asterix.external.feed.api.IFeedWorkEventListener; -import org.apache.asterix.external.feed.api.IFeedWorkManager; - -/** - * Handles asynchronous execution of feed management related tasks. - */ -public class FeedWorkManager implements IFeedWorkManager { - - public static final FeedWorkManager INSTANCE = new FeedWorkManager(); - - private final ExecutorService executorService = Executors.newCachedThreadPool(); - - private FeedWorkManager() { - } - - public void submitWork(IFeedWork work, IFeedWorkEventListener listener) { - Runnable runnable = work.getRunnable(); - try { - executorService.execute(runnable); - listener.workCompleted(work); - } catch (Exception e) { - listener.workFailed(work, e); - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java deleted file mode 100644 index 7a3a376..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.message; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.util.FeedConstants; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; - -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * @deprecated A feed control message indicating the need to end the feed. This message is dispatched - * to all locations that host an operator involved in the feed pipeline. - * Instead, use IMessageBroker messages - */ -@Deprecated -public class EndFeedMessage extends FeedMessage { - - private static final long serialVersionUID = 1L; - - private final EntityId sourceFeedId; - - private final FeedConnectionId connectionId; - - private final FeedRuntimeType sourceRuntimeType; - - private final boolean completeDisconnection; - - private final EndMessageType endMessageType; - - public enum EndMessageType { - DISCONNECT_FEED, - DISCONTINUE_SOURCE - } - - public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, EntityId sourceFeedId, - boolean completeDisconnection, EndMessageType endMessageType) { - super(MessageType.END); - this.connectionId = connectionId; - this.sourceRuntimeType = sourceRuntimeType; - this.sourceFeedId = sourceFeedId; - this.completeDisconnection = completeDisconnection; - this.endMessageType = endMessageType; - } - - @Override - public String toString() { - return MessageType.END.name() + " " + connectionId + " [" + sourceRuntimeType + "] "; - } - - public FeedRuntimeType getSourceRuntimeType() { - return sourceRuntimeType; - } - - public EntityId getSourceFeedId() { - return sourceFeedId; - } - - public boolean isCompleteDisconnection() { - return completeDisconnection; - } - - public EndMessageType getEndMessageType() { - return endMessageType; - } - - @Override - public ObjectNode toJSON() { - ObjectMapper om = new ObjectMapper(); - ObjectNode obj = om.createObjectNode(); - obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name()); - obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse()); - obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getEntityName()); - obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName()); - return obj; - } - - public FeedConnectionId getFeedConnectionId() { - return connectionId; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java deleted file mode 100644 index 4f57fb5..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.message; - -import org.apache.asterix.active.IActiveMessage; -import org.apache.hyracks.api.dataflow.value.JSONSerializable; - -/** - * A control message that can be sent to the runtime instance of a - * feed's adapter. - */ -public abstract class FeedMessage implements IActiveMessage, JSONSerializable { - - private static final long serialVersionUID = 1L; - - protected final MessageType messageType; - - public FeedMessage(MessageType messageType) { - this.messageType = messageType; - } - - public MessageType getMessageType() { - return messageType; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java deleted file mode 100644 index 821a0b1..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.runtime; - -import java.util.Map; - -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.IActiveRuntime; -import org.apache.asterix.external.feed.api.ISubscribableRuntime; -import org.apache.asterix.external.feed.dataflow.FeedFrameCollector; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * Represents the feed runtime that collects feed tuples from another feed. - * In case of a primary feed, the CollectionRuntime collects tuples from the feed - * intake job. For a secondary feed, tuples are collected from the intake/compute - * runtime associated with the source feed. - */ -public class CollectionRuntime implements IActiveRuntime { - - private final FeedConnectionId connectionId; // [Dataverse - Feed - Dataset] - private final ISubscribableRuntime sourceRuntime; // Runtime that provides the data - private final Map<String, String> feedPolicy; // Policy associated with the feed - private final FeedFrameCollector frameCollector; // Collector that can be plugged into a frame distributor - private final IHyracksTaskContext ctx; - private final ActiveRuntimeId runtimeId; - - public CollectionRuntime(FeedConnectionId connectionId, ActiveRuntimeId runtimeId, - ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy, IHyracksTaskContext ctx, - FeedFrameCollector frameCollector) { - this.runtimeId = runtimeId; - this.connectionId = connectionId; - this.sourceRuntime = sourceRuntime; - this.feedPolicy = feedPolicy; - this.ctx = ctx; - this.frameCollector = frameCollector; - } - - public void waitTillCollectionOver() throws InterruptedException { - if (!(isCollectionOver())) { - synchronized (frameCollector) { - while (!isCollectionOver()) { - frameCollector.wait(); - } - } - } - } - - private boolean isCollectionOver() { - return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED) - || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER); - } - - public Map<String, String> getFeedPolicy() { - return feedPolicy; - } - - public FeedConnectionId getConnectionId() { - return connectionId; - } - - public ISubscribableRuntime getSourceRuntime() { - return sourceRuntime; - } - - public FeedFrameCollector getFrameCollector() { - return frameCollector; - } - - public IHyracksTaskContext getCtx() { - return ctx; - } - - @Override - public ActiveRuntimeId getRuntimeId() { - return runtimeId; - } - - @Override - public void stop() throws HyracksDataException, InterruptedException { - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java index d32a604..3100704 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java @@ -19,79 +19,43 @@ package org.apache.asterix.external.feed.runtime; import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; -import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter; -import org.apache.asterix.external.feed.dataflow.FeedFrameCollector; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.util.HyracksConstants; -import org.apache.hyracks.dataflow.common.utils.TaskUtil; -public class IngestionRuntime extends SubscribableRuntime { +public class IngestionRuntime implements IActiveRuntime { + + private static final Logger LOGGER = Logger.getLogger(IngestionRuntime.class.getName()); private final AdapterRuntimeManager adapterRuntimeManager; - private final IHyracksTaskContext ctx; - private int numSubscribers = 0; + private final ActiveRuntimeId runtimeId; + private final EntityId feedId; - public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter, - AdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) { - super(entityId, runtimeId, feedWriter); + public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId, AdapterRuntimeManager adaptorRuntimeManager) { + this.feedId = entityId; + this.runtimeId = runtimeId; this.adapterRuntimeManager = adaptorRuntimeManager; - this.ctx = ctx; } @Override - public synchronized void subscribe(CollectionRuntime collectionRuntime) throws HyracksDataException { - FeedFrameCollector collector = collectionRuntime.getFrameCollector(); - dWriter.subscribe(collector); - subscribers.add(collectionRuntime); - if (numSubscribers == 0) { - TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); - TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, - TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx()); - start(); - } - numSubscribers++; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this); - } - } - - @Override - public synchronized void unsubscribe(CollectionRuntime collectionRuntime) throws InterruptedException { - numSubscribers--; - if (numSubscribers == 0) { - stop(); - } - subscribers.remove(collectionRuntime); - } - - public AdapterRuntimeManager getAdapterRuntimeManager() { - return adapterRuntimeManager; - } - - public void terminate() { - for (IActiveRuntime subscriber : subscribers) { - try { - unsubscribe((CollectionRuntime) subscriber); - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Excpetion in unsubscribing " + subscriber + " message " + e.getMessage()); - } - } - } + public ActiveRuntimeId getRuntimeId() { + return this.runtimeId; } public void start() { adapterRuntimeManager.start(); + LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " running on partition " + runtimeId); } @Override public void stop() throws InterruptedException { adapterRuntimeManager.stop(); + LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on partition " + runtimeId); + } + + public EntityId getFeedId() { + return feedId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java deleted file mode 100644 index fb70fdb..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.runtime; - -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveRuntime; -import org.apache.asterix.external.feed.api.ISubscribableRuntime; -import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter; - -public abstract class SubscribableRuntime implements ISubscribableRuntime { - - protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName()); - protected final EntityId feedId; - protected final List<IActiveRuntime> subscribers; - protected final DistributeFeedFrameWriter dWriter; - protected final ActiveRuntimeId runtimeId; - - public SubscribableRuntime(EntityId feedId, ActiveRuntimeId runtimeId, DistributeFeedFrameWriter dWriter) { - this.runtimeId = runtimeId; - this.feedId = feedId; - this.dWriter = dWriter; - this.subscribers = new ArrayList<>(); - } - - @Override - public ActiveRuntimeId getRuntimeId() { - return runtimeId; - } - - public EntityId getFeedId() { - return feedId; - } - - @Override - public String toString() { - return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")"; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java deleted file mode 100644 index 82cdddf..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.watch; - -import java.util.List; -import java.util.Map; - -import org.apache.asterix.active.ActiveJob; -import org.apache.asterix.active.ActivityState; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.util.FeedUtils.JobType; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class FeedConnectJobInfo extends ActiveJob { - - private static final long serialVersionUID = 1L; - private final FeedConnectionId connectionId; - private final Map<String, String> feedPolicy; - private final IFeedJoint sourceFeedJoint; - private IFeedJoint computeFeedJoint; - - private List<String> collectLocations; - private List<String> computeLocations; - private List<String> storageLocations; - private int partitionStarts = 0; - - public FeedConnectJobInfo(EntityId entityId, JobId jobId, ActivityState state, FeedConnectionId connectionId, - IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec, - Map<String, String> feedPolicy) { - super(entityId, jobId, state, JobType.FEED_CONNECT, spec); - this.connectionId = connectionId; - this.sourceFeedJoint = sourceFeedJoint; - this.computeFeedJoint = computeFeedJoint; - this.feedPolicy = feedPolicy; - } - - public FeedConnectionId getConnectionId() { - return connectionId; - } - - public List<String> getCollectLocations() { - return collectLocations; - } - - public List<String> getComputeLocations() { - return computeLocations; - } - - public List<String> getStorageLocations() { - return storageLocations; - } - - public void setCollectLocations(List<String> collectLocations) { - this.collectLocations = collectLocations; - } - - public void setComputeLocations(List<String> computeLocations) { - this.computeLocations = computeLocations; - } - - public void setStorageLocations(List<String> storageLocations) { - this.storageLocations = storageLocations; - } - - public IFeedJoint getSourceFeedJoint() { - return sourceFeedJoint; - } - - public IFeedJoint getComputeFeedJoint() { - return computeFeedJoint; - } - - public Map<String, String> getFeedPolicy() { - return feedPolicy; - } - - public void setComputeFeedJoint(IFeedJoint computeFeedJoint) { - this.computeFeedJoint = computeFeedJoint; - } - - public void partitionStart() { - partitionStarts++; - } - - public boolean collectionStarted() { - return partitionStarts == collectLocations.size(); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java new file mode 100644 index 0000000..0e931f7 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.watch; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.external.feed.management.FeedEventsListener; + +public class FeedEventSubscriber implements IActiveEventSubscriber { + + private final FeedEventsListener listener; + private final ActivityState state; + private boolean done = false; + + public FeedEventSubscriber(FeedEventsListener listener, ActivityState state) { + this.listener = listener; + this.state = state; + + } + + @Override + public synchronized void notify(ActiveEvent event) { + if (listener.getState() == state || listener.getState() == ActivityState.FAILED + || listener.getState() == ActivityState.STOPPED) { + done = true; + notifyAll(); + } + } + + @Override + public synchronized boolean done() { + return done; + } + + @Override + public synchronized void sync() throws InterruptedException { + while (!done) { + wait(); + } + } + + @Override + public synchronized void unsubscribe() { + done = true; + notifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java deleted file mode 100644 index 4114e82..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.watch; - -import java.util.List; - -import org.apache.asterix.active.ActiveJob; -import org.apache.asterix.active.ActivityState; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.util.FeedUtils.JobType; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class FeedIntakeInfo extends ActiveJob { - - private static final long serialVersionUID = 1L; - private final EntityId feedId; - private final IFeedJoint intakeFeedJoint; - private final JobSpecification spec; - private List<String> intakeLocation; - - public FeedIntakeInfo(JobId jobId, ActivityState state, EntityId feedId, IFeedJoint intakeFeedJoint, - JobSpecification spec) { - super(feedId, jobId, state, JobType.INTAKE, spec); - this.feedId = feedId; - this.intakeFeedJoint = intakeFeedJoint; - this.spec = spec; - } - - public EntityId getFeedId() { - return feedId; - } - - public IFeedJoint getIntakeFeedJoint() { - return intakeFeedJoint; - } - - @Override - public JobSpecification getSpec() { - return spec; - } - - public List<String> getIntakeLocation() { - return intakeLocation; - } - - public void setIntakeLocation(List<String> intakeLocation) { - this.intakeLocation = intakeLocation; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java new file mode 100644 index 0000000..9d8c570 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.watch; + +import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.IActiveEventSubscriber; + +/** + * An event subscriber that does not listen to any events + */ +public class NoOpSubscriber implements IActiveEventSubscriber { + + public static final NoOpSubscriber INSTANCE = new NoOpSubscriber(); + + private NoOpSubscriber() { + } + + @Override + public void notify(ActiveEvent event) { + // do nothing + } + + @Override + public boolean done() { + return true; + } + + @Override + public void sync() { + // do nothing + } + + @Override + public void unsubscribe() { + // do nothing + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 570155c..4d8be98 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -48,6 +48,15 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> private Map<String, String> configuration; private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + public static boolean isTwitterPull(Map<String, String> configuration) { + String reader = configuration.get(ExternalDataConstants.KEY_READER); + if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL) + || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) { + return true; + } + return false; + } + @Override public DataSourceType getDataSourceType() { return DataSourceType.RECORDS; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java index 5337be1..6a581ef 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java @@ -20,6 +20,7 @@ package org.apache.asterix.external.input.stream.factory; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; @@ -109,7 +110,8 @@ public class SocketServerInputStreamFactory implements IInputStreamFactory { try { Pair<String, Integer> socket = sockets.get(partition); ServerSocket server; - server = new ServerSocket(socket.second); + server = new ServerSocket(); + server.bind(new InetSocketAddress(socket.second)); return new SocketServerInputStream(server); } catch (IOException e) { throw new HyracksDataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index 4ecb887..c4cb650 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -20,11 +20,6 @@ package org.apache.asterix.external.operators; import java.util.Map; -import org.apache.asterix.active.ActiveManager; -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.external.feed.api.ISubscribableRuntime; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.asterix.om.types.ARecordType; @@ -55,21 +50,17 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato /** Map representation of policy parameters */ private final Map<String, String> feedPolicyProperties; - /** The source feed from which the feed derives its data from. **/ - private final EntityId sourceFeedId; - /** The subscription location at which the recipient feed receives tuples from the source feed {SOURCE_FEED_INTAKE_STAGE , SOURCE_FEED_COMPUTE_STAGE} **/ private final FeedRuntimeType subscriptionLocation; - public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, - EntityId sourceFeedId, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, + public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, ARecordType atype, + RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, FeedRuntimeType subscriptionLocation) { - super(spec, 0, 1); + super(spec, 1, 1); this.recordDescriptors[0] = rDesc; this.outputType = atype; this.connectionId = feedConnectionId; this.feedPolicyProperties = feedPolicyProperties; - this.sourceFeedId = sourceFeedId; this.subscriptionLocation = subscriptionLocation; } @@ -77,11 +68,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) throws HyracksDataException { - ActiveManager feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext() - .getApplicationContext().getApplicationObject()).getActiveManager(); - ActiveRuntimeId sourceRuntimeId = new ActiveRuntimeId(sourceFeedId, subscriptionLocation.toString(), partition); - ISubscribableRuntime sourceRuntime = (ISubscribableRuntime) feedManager.getRuntime(sourceRuntimeId); - return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition, sourceRuntime); + return new FeedCollectOperatorNodePushable(ctx, connectionId, feedPolicyProperties, partition); } public FeedConnectionId getFeedConnectionId() { @@ -100,10 +87,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato return recordDescriptors[0]; } - public EntityId getSourceFeedId() { - return sourceFeedId; - } - public FeedRuntimeType getSubscriptionLocation() { return subscriptionLocation; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java index d7fa590..384da84 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java @@ -18,46 +18,38 @@ */ package org.apache.asterix.external.operators; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.external.feed.api.ISubscribableRuntime; -import org.apache.asterix.external.feed.dataflow.FeedFrameCollector; import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler; import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.runtime.CollectionRuntime; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; /** * The first operator in a collect job in a feed. */ -public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable { +public class FeedCollectOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable { private final int partition; private final FeedConnectionId connectionId; - private final Map<String, String> feedPolicy; private final FeedPolicyAccessor policyAccessor; private final ActiveManager activeManager; - private final ISubscribableRuntime sourceRuntime; private final IHyracksTaskContext ctx; - private CollectionRuntime collectRuntime; public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedConnectionId, - Map<String, String> feedPolicy, int partition, ISubscribableRuntime sourceRuntime) { + Map<String, String> feedPolicy, int partition) { this.ctx = ctx; this.partition = partition; this.connectionId = feedConnectionId; - this.sourceRuntime = sourceRuntime; - this.feedPolicy = feedPolicy; this.policyAccessor = new FeedPolicyAccessor(feedPolicy); this.activeManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext() .getApplicationObject()).getActiveManager(); @@ -68,7 +60,6 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp try { ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT.toString(), partition); - // Does this collector have a handler? FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc); if (policyAccessor.bufferingEnabled()) { writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor, @@ -76,17 +67,33 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp } else { writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor); } - collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx, - new FeedFrameCollector(policyAccessor, writer, connectionId)); - activeManager.registerRuntime(collectRuntime); - sourceRuntime.subscribe(collectRuntime); - // Notify CC that Collection started - ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null); - collectRuntime.waitTillCollectionOver(); - activeManager.deregisterRuntime(collectRuntime.getRuntimeId()); } catch (Exception e) { throw new HyracksDataException(e); } } + + @Override + public void open() throws HyracksDataException { + writer.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + writer.nextFrame(buffer); + } + + @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void flush() throws HyracksDataException { + writer.flush(); + } + + @Override + public void close() throws HyracksDataException { + writer.close(); + } }
