Repository: asterixdb Updated Branches: refs/heads/master 692b8a890 -> fff200ca8
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 21db749..385f2bd 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -19,34 +19,21 @@ package org.apache.asterix.metadata.feeds; import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.logging.Level; -import java.util.logging.Logger; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; -import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; import org.apache.asterix.external.provider.AdapterFactoryProvider; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; @@ -56,34 +43,14 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.entities.FeedPolicyEntity; -import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.common.utils.Triple; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; -import org.apache.hyracks.api.constraints.Constraint; -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.constraints.expressions.ConstantExpression; -import org.apache.hyracks.api.constraints.expressions.ConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; -import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; -import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; -import org.apache.hyracks.api.dataflow.IConnectorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor; /** * A utility class for providing helper functions for feeds @@ -91,8 +58,6 @@ import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageCon */ public class FeedMetadataUtil { - private static final Logger LOGGER = Logger.getLogger(FeedMetadataUtil.class.getName()); - public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx) throws CompilationException { Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName); @@ -130,193 +95,6 @@ public class FeedMetadataUtil { return feedPolicy; } - public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec, - FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Original Job Spec:" + spec); - } - - JobSpecification altered = new JobSpecification(spec.getFrameSize()); - Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap(); - boolean preProcessingRequired = preProcessingRequired(feedConnectionId); - // copy operators - Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<>(); - FeedMetaOperatorDescriptor metaOp; - for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) { - String operandId = null; - IOperatorDescriptor opDesc = entry.getValue(); - if (opDesc instanceof FeedCollectOperatorDescriptor) { - FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc; - FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered, - orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(), - orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation()); - oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId()); - } else if ((opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor) - && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) { - // only introduce store before primary index - operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName(); - metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties, - FeedRuntimeType.STORE, false, operandId); - oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId()); - } else { - FeedRuntimeType runtimeType; - boolean enableSubscriptionMode; - OperatorDescriptorId opId = null; - if (opDesc instanceof AlgebricksMetaOperatorDescriptor) { - IPushRuntimeFactory[] runtimeFactories = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline() - .getRuntimeFactories(); - if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length > 1) { - IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId()) - .get(0); - IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc); - if (sourceOp instanceof FeedCollectOperatorDescriptor) { - runtimeType = FeedRuntimeType.COMPUTE; - enableSubscriptionMode = preProcessingRequired; - metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, - feedPolicyProperties, runtimeType, enableSubscriptionMode, operandId); - opId = metaOp.getOperatorId(); - } - } - } - if (opId == null) { - opId = altered.createOperatorDescriptorId(opDesc); - } - oldNewOID.put(opDesc.getOperatorId(), opId); - } - } - - // copy connectors - Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<>(); - for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) { - IConnectorDescriptor connDesc = entry.getValue(); - ConnectorDescriptorId newConnId; - if (connDesc instanceof MToNPartitioningConnectorDescriptor) { - MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc; - connDesc = new MToNPartitioningWithMessageConnectorDescriptor(altered, - m2nConn.getTuplePartitionComputerFactory()); - newConnId = connDesc.getConnectorId(); - } else { - newConnId = altered.createConnectorDescriptor(connDesc); - } - connectorMapping.put(entry.getKey(), newConnId); - } - - // make connections between operators - for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, - Pair<IOperatorDescriptor, Integer>>> entry : spec.getConnectorOperatorMap().entrySet()) { - IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey())); - Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft(); - Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight(); - - IOperatorDescriptor leftOpDesc = altered.getOperatorMap() - .get(oldNewOID.get(leftOp.getLeft().getOperatorId())); - IOperatorDescriptor rightOpDesc = altered.getOperatorMap() - .get(oldNewOID.get(rightOp.getLeft().getOperatorId())); - - altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight()); - } - - // prepare for setting partition constraints - Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); - Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); - - for (Constraint constraint : spec.getUserConstraints()) { - LValueConstraintExpression lexpr = constraint.getLValue(); - ConstraintExpression cexpr = constraint.getRValue(); - OperatorDescriptorId opId; - switch (lexpr.getTag()) { - case PARTITION_COUNT: - opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId(); - operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue()); - break; - case PARTITION_LOCATION: - opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId(); - - IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId)); - List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId()); - if (locations == null) { - locations = new ArrayList<>(); - operatorLocations.put(opDesc.getOperatorId(), locations); - } - String location = (String) ((ConstantExpression) cexpr).getValue(); - LocationConstraint lc = new LocationConstraint(location, - ((PartitionLocationExpression) lexpr).getPartition()); - locations.add(lc); - break; - default: - break; - } - } - - // set absolute location constraints - for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) { - IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey())); - Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint o2) -> { - return o1.partition - o2.partition; - }); - String[] locations = new String[entry.getValue().size()]; - for (int i = 0; i < locations.length; ++i) { - locations[i] = entry.getValue().get(i).location; - } - PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations); - } - - // set count constraints - for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) { - IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey())); - if (!operatorLocations.keySet().contains(entry.getKey())) { - PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue()); - } - } - - // useConnectorSchedulingPolicy - altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling()); - - // connectorAssignmentPolicy - altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy()); - - // roots - for (OperatorDescriptorId root : spec.getRoots()) { - altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root))); - } - - // jobEventListenerFactory - altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory()); - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("New Job Spec:" + altered); - } - - return altered; - - } - - private static boolean preProcessingRequired(FeedConnectionId connectionId) { - MetadataTransactionContext ctx = null; - Feed feed = null; - boolean preProcessingRequired = false; - try { - MetadataManager.INSTANCE.acquireReadLatch(); - ctx = MetadataManager.INSTANCE.beginTransaction(); - feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(), - connectionId.getFeedId().getEntityName()); - preProcessingRequired = feed.getAppliedFunction() != null; - MetadataManager.INSTANCE.commitTransaction(ctx); - } catch (Exception e) { - if (ctx != null) { - try { - MetadataManager.INSTANCE.abortTransaction(ctx); - } catch (Exception abortException) { - e.addSuppressed(abortException); - throw new IllegalStateException(e); - } - } - } finally { - MetadataManager.INSTANCE.releaseReadLatch(); - } - return preProcessingRequired; - } - public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx, ILibraryManager libraryManager) throws AsterixException { try { @@ -537,32 +315,4 @@ public class FeedMetadataUtil { } return outputType; } - - public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor, - MetadataTransactionContext mdTxnCtx) - throws AlgebricksException, MetadataException, RemoteException, ACIDException { - String outputType = null; - String primaryFeedName = feed.getSourceFeedName(); - Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName); - FunctionSignature appliedFunction = primaryFeed.getAppliedFunction(); - if (appliedFunction == null) { - outputType = getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME) - .getDisplayName(); - } else { - Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); - if (function != null) { - if (function.getLanguage().equals(Function.LANGUAGE_AQL)) { - throw new NotImplementedException( - "Secondary feeds derived from a source feed that has an applied AQL function" - + " are not supported yet."); - } else { - outputType = function.getReturnType(); - } - } else { - throw new IllegalArgumentException( - "Function " + appliedFunction + " associated with source feed not found in Metadata."); - } - } - return outputType; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java deleted file mode 100644 index 00c9010..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedOperations.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.feeds; - -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveMessage; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.management.FeedEventsListener; -import org.apache.asterix.external.feed.message.EndFeedMessage; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor; -import org.apache.asterix.external.util.FeedConstants; -import org.apache.asterix.external.util.FeedUtils; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Feed; -import org.apache.asterix.runtime.utils.ClusterStateManager; -import org.apache.asterix.runtime.utils.RuntimeUtils; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.common.utils.Triple; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; - -/** - * Provides helper method(s) for creating JobSpec for operations on a feed. - */ -public class FeedOperations { - - private FeedOperations() { - } - - /** - * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor. - * - * @param primaryFeed - * @param metadataProvider - * @return JobSpecification the Hyracks job specification for receiving data from external source - * @throws Exception - */ - public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed, - MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception { - JobSpecification spec = RuntimeUtils.createJobSpecification(); - spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE); - IAdapterFactory adapterFactory; - IOperatorDescriptor feedIngestor; - AlgebricksPartitionConstraint ingesterPc; - Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = - metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor); - feedIngestor = t.first; - ingesterPc = t.second; - adapterFactory = t.third; - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc); - NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc); - spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0); - spec.addRoot(nullSink); - return new Pair<>(spec, adapterFactory); - } - - /** - * Builds the job spec for sending message to an active feed to disconnect it from the - * its source. - */ - public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(FeedConnectionId connectionId) - throws AlgebricksException { - - JobSpecification spec = RuntimeUtils.createJobSpecification(); - IOperatorDescriptor feedMessenger; - AlgebricksPartitionConstraint messengerPc; - List<String> locations = null; - FeedRuntimeType sourceRuntimeType; - try { - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(connectionId.getFeedId()); - FeedConnectJobInfo cInfo = listener.getFeedConnectJobInfo(connectionId); - IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint(); - IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint(); - - boolean terminateIntakeJob = false; - boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty(); - if (completeDisconnect) { - sourceRuntimeType = FeedRuntimeType.INTAKE; - locations = cInfo.getCollectLocations(); - terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1; - } else { - locations = cInfo.getComputeLocations(); - sourceRuntimeType = FeedRuntimeType.COMPUTE; - } - - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec, - connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId()); - - feedMessenger = p.first; - messengerPc = p.second; - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc); - NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc); - spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0); - spec.addRoot(nullSink); - return new Pair<>(spec, terminateIntakeJob); - - } catch (AlgebricksException e) { - throw new AsterixException(e); - } - - } - - private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime( - JobSpecification jobSpec, FeedConnectionId feedConenctionId, IActiveMessage feedMessage, - Collection<String> locations) throws AlgebricksException { - AlgebricksPartitionConstraint partitionConstraint = - new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {})); - FeedMessageOperatorDescriptor feedMessenger = - new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, feedMessage); - return new Pair<>(feedMessenger, partitionConstraint); - } - - private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime( - JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations, - FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, EntityId sourceFeedId) - throws AlgebricksException { - IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId, - completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED); - return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations); - } - - public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws AsterixException { - JobSpecification spec = RuntimeUtils.createJobSpecification(); - AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); - Set<String> nodes = new TreeSet<>(); - for (String node : allCluster.getLocations()) { - nodes.add(node); - } - AlgebricksAbsolutePartitionConstraint locations = - new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); - FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second); - spec.addRoot(frod); - return spec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java index a9c721a..81eaa9b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.asterix.metadata.entities.Dataverse; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; public class MetadataLockManager { @@ -480,6 +482,33 @@ public class MetadataLockManager { releaseDataverseReadLock(dataverseName); } + public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection> feedConnections) { + acquireDataverseReadLock(dataverseName); + acquireFeedReadLock(feedName); + for (FeedConnection feedConnection : feedConnections) { + acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName()); + } + } + + public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection> feedConnections) { + releaseDataverseReadLock(dataverseName); + releaseFeedReadLock(feedName); + for (FeedConnection feedConnection : feedConnections) { + releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName()); + } + } + + public void StopFeedBegin(String dataverseName, String feedName) { + // TODO: dataset lock? + acquireDataverseReadLock(dataverseName); + acquireFeedReadLock(feedName); + } + + public void StopFeedEnd(String dataverseName, String feedName) { + releaseDataverseReadLock(dataverseName); + releaseFeedReadLock(feedName); + } + public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) { acquireDataverseReadLock(dataverseName); acquireFeedWriteLock(feedFullyQualifiedName); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql index df6c9b6..565d61b 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql index 39a6272..3be4db8 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 @@ -30,3 +30,5 @@ use dataverse externallibtest; set wait-for-completion-feed "true"; connect feed TestTypedAdapterFeed to dataset TweetsTestAdapter; + +start feed TestTypedAdapterFeed; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql index 3df9d2b..2860f17 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql index 35125a1..6e84ea3 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 @@ -46,8 +46,9 @@ create type TweetOutputType as closed { create feed TweetFeed using file_feed -(("type-name"="TweetInputType"),("fs"="localfs"),("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10")) -apply function testlib#parseTweet; +(("type-name"="TweetInputType"),("fs"="localfs"), +("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10")); -create dataset TweetsFeedIngest(TweetOutputType) + +create dataset TweetsFeedIngest(TweetOutputType) primary key id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql index d5a6f58..db68138 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 @@ -29,4 +29,5 @@ use dataverse externallibtest; set wait-for-completion-feed "true"; -connect feed TweetFeed to dataset TweetsFeedIngest; +connect feed TweetFeed to dataset TweetsFeedIngesta pply function testlib#parseTweet; +start feed TweetFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql index b188b52..22d1d27 100644 --- a/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql +++ b/asterixdb/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013
