http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 978c2eb..f111c54 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -42,10 +42,10 @@ import org.apache.asterix.active.ActiveJobNotificationHandler; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.asterix.algebra.extension.IExtensionStatement; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.app.external.FeedJoint; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.result.ResultUtil; @@ -60,31 +60,18 @@ import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; +import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent; -import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.api.IFeed.FeedType; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType; -import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber; import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.management.FeedEventsListener; -import org.apache.asterix.external.feed.management.FeedJointKey; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedActivityDetails; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.feed.watch.FeedIntakeInfo; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; import org.apache.asterix.lang.common.base.IReturningStatement; import org.apache.asterix.lang.common.base.IRewriterFactory; import org.apache.asterix.lang.common.base.IStatementRewriter; @@ -97,8 +84,6 @@ import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; import org.apache.asterix.lang.common.statement.CreateFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -121,11 +106,12 @@ import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement; import org.apache.asterix.lang.common.statement.RunStatement; import org.apache.asterix.lang.common.statement.SetStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.TypeDecl; import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.WriteStatement; import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; @@ -141,14 +127,13 @@ import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; -import org.apache.asterix.metadata.feeds.FeedOperations; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; import org.apache.asterix.metadata.utils.ExternalIndexingOperations; @@ -165,11 +150,9 @@ import org.apache.asterix.om.types.TypeSignature; import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; -import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement; import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.IStatementExecutor; @@ -177,13 +160,14 @@ import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.util.ValidateUtil; import org.apache.asterix.utils.DataverseUtil; +import org.apache.asterix.utils.FeedOperations; import org.apache.asterix.utils.FlushDatasetUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; import org.apache.hyracks.algebricks.data.IAWriterFactory; import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; @@ -221,8 +205,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; - public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider, - IStorageComponentProvider componentProvider, ExecutorService executorService) { + public QueryTranslator(List<Statement> statements, SessionConfig conf, + ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, + ExecutorService executorService) { this.statements = statements; this.sessionConfig = conf; this.componentProvider = componentProvider; @@ -349,8 +334,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case Statement.Kind.DELETE: handleDeleteStatement(metadataProvider, stmt, hcc, false); break; - case Statement.Kind.CREATE_PRIMARY_FEED: - case Statement.Kind.CREATE_SECONDARY_FEED: + case Statement.Kind.CREATE_FEED: handleCreateFeedStatement(metadataProvider, stmt); break; case Statement.Kind.DROP_FEED: @@ -360,13 +344,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen handleDropFeedPolicyStatement(metadataProvider, stmt); break; case Statement.Kind.CONNECT_FEED: - handleConnectFeedStatement(metadataProvider, stmt, hcc); + handleConnectFeedStatement(metadataProvider, stmt); break; case Statement.Kind.DISCONNECT_FEED: - handleDisconnectFeedStatement(metadataProvider, stmt, hcc); + handleDisconnectFeedStatement(metadataProvider, stmt); break; - case Statement.Kind.SUBSCRIBE_FEED: - handleSubscribeFeedStatement(metadataProvider, stmt, hcc); + case Statement.Kind.START_FEED: + handleStartFeedStatement(metadataProvider, stmt, hcc); + break; + case Statement.Kind.STOP_FEED: + handleStopFeedStatement(metadataProvider, stmt); break; case Statement.Kind.CREATE_FEED_POLICY: handleCreateFeedPolicyStatement(metadataProvider, stmt); @@ -702,12 +689,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) - throws CompilationException { + protected void validateIfResourceIsActiveInFeed(Dataset dataset) throws CompilationException { StringBuilder builder = null; IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityUsingDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(dataset)) { if (builder == null) { builder = new StringBuilder(); } @@ -715,8 +701,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } if (builder != null) { - throw new CompilationException("Dataset " + dataverseName + "." + datasetName + " is currently being " - + "fed into by the following active entities.\n" + builder.toString()); + throw new CompilationException("Dataset " + dataset.getDataverseName() + "." + dataset.getDatasetName() + + " is currently being " + "fed into by the following active entities.\n" + builder.toString()); } } @@ -907,7 +893,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } if (ds.getDatasetType() == DatasetType.INTERNAL) { - validateIfResourceIsActiveInFeed(dataverseName, datasetName); + validateIfResourceIsActiveInFeed(ds); } else { // External dataset // Check if the dataset is indexible @@ -1204,11 +1190,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen EntityId activeEntityId = listener.getEntityId(); if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) && activeEntityId.getDataverse().equals(dataverseName)) { - FeedEventsListener feedEventListener = (FeedEventsListener) listener; - FeedConnectionId[] connections = feedEventListener.getConnections(); - for (FeedConnectionId conn : connections) { - disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc); - } + stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())), + metadataProvider); // prepare job to remove feed log storage jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); @@ -1316,20 +1299,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void disconnectFeedBeforeDelete(Identifier dvId, EntityId activeEntityId, FeedConnectionId conn, - MetadataProvider metadataProvider, IHyracksClientConnection hcc) { - DisconnectFeedStatement disStmt = new DisconnectFeedStatement(dvId, - new Identifier(activeEntityId.getEntityName()), new Identifier(conn.getDatasetName())); + protected void stopFeedBeforeDelete(Pair<Identifier, Identifier> feedNameComp, MetadataProvider metadataProvider) { + StopFeedStatement disStmt = new StopFeedStatement(feedNameComp); try { - handleDisconnectFeedStatement(metadataProvider, disStmt, hcc); + handleStopFeedStatement(metadataProvider, disStmt); if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Disconnected feed " + activeEntityId.getEntityName() + " from dataset " - + conn.getDatasetName()); + LOGGER.info("Stopped feed " + feedNameComp.second.getValue()); } } catch (Exception exception) { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to disconnect feed " + activeEntityId.getEntityName() + " from dataset " - + conn.getDatasetName() + ". Encountered exception " + exception); + LOGGER.warning("Unable to stop feed " + feedNameComp.second.getValue() + exception); } } } @@ -1407,7 +1386,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityUsingDataset(dataverseName, datasetName) && listener.isEntityActive()) { + if (listener.isEntityUsingDataset(ds)) { throw new CompilationException( "Can't drop dataset since it is connected to active entity: " + listener.getEntityId()); } @@ -1523,7 +1502,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityUsingDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(ds)) { if (builder == null) { builder = new StringBuilder(); } @@ -1991,22 +1970,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException("A feed with this name " + feedName + " already exists."); } } - - switch (stmt.getKind()) { - case Statement.Kind.CREATE_PRIMARY_FEED: - CreatePrimaryFeedStatement cpfs = (CreatePrimaryFeedStatement) stmt; - String adaptorName = cpfs.getAdaptorName(); - feed = new Feed(dataverseName, feedName, cfs.getAppliedFunction(), FeedType.PRIMARY, feedName, - adaptorName, cpfs.getAdaptorConfiguration()); - break; - case Statement.Kind.CREATE_SECONDARY_FEED: - CreateSecondaryFeedStatement csfs = (CreateSecondaryFeedStatement) stmt; - feed = new Feed(dataverseName, feedName, csfs.getAppliedFunction(), FeedType.SECONDARY, - csfs.getSourceFeedName(), null, null); - break; - default: - throw new IllegalStateException(); - } + String adaptorName = cfs.getAdaptorName(); + feed = new Feed(dataverseName, feedName, adaptorName, cfs.getAdaptorConfiguration()); FeedMetadataUtil.validateFeed(feed, mdTxnCtx, metadataProvider.getLibraryManager()); MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -2103,12 +2068,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId); if (listener != null) { - StringBuilder builder = new StringBuilder(); - for (FeedConnectionId connectionId : listener.getConnections()) { - builder.append(connectionId.getDatasetName() + "\n"); - } throw new AlgebricksException("Feed " + feedId - + " is currently active and connected to the following dataset(s) \n" + builder.toString()); + + " is currently active and connected to the following dataset(s) \n" + listener.toString()); } else { JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob( MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); @@ -2156,305 +2117,159 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleConnectFeedStatement(MetadataProvider metadataProvider, Statement stmt, + private void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { - ConnectFeedStatement cfs = (ConnectFeedStatement) stmt; - String dataverseName = getActiveDataverse(cfs.getDataverseName()); - String feedName = cfs.getFeedName(); - String datasetName = cfs.getDatasetName().getValue(); - boolean bActiveTxn = true; + StartFeedStatement sfs = (StartFeedStatement) stmt; + String dataverseName = getActiveDataverse(sfs.getDataverseName()); + String feedName = sfs.getFeedName().getValue(); + // Transcation handler MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - metadataProvider.disableBlockingOperator(); - boolean subscriberRegistered = false; - IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); - FeedConnectionId feedConnId = null; - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName()); + // Runtime handler + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + // Feed & Feed Connections + Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, + metadataProvider.getMetadataTxnContext()); + List<FeedConnection> feedConnections = MetadataManager.INSTANCE + .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); + ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); + IStorageComponentProvider storageComponentProvider = new StorageComponentProvider(); + DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(); FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); - MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + feedName); + if (listener != null) { + throw new AlgebricksException("Feed " + feedName + " is started already."); + } + // Start try { - metadataProvider.setWriteTransaction(true); - CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), - cfs.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter()); - FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), - metadataProvider.getMetadataTxnContext()); - Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(), - metadataProvider.getMetadataTxnContext()); - feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue()); - if (listener != null) { - subscriberRegistered = listener.isFeedConnectionActive(feedConnId, eventSubscriber); - } - if (subscriberRegistered) { - throw new CompilationException("Feed " + cfs.getFeedName() + " is already connected to dataset " - + cfs.getDatasetName().getValue()); - } - FeedPolicyEntity feedPolicy = - FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx); - // All Metadata checks have passed. Feed connect request is valid. // - if (listener == null) { - listener = new FeedEventsListener(entityId); - ActiveJobNotificationHandler.INSTANCE.registerListener(listener); - } - FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties()); - Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = - getFeedConnectionRequest(dataverseName, feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx); - FeedConnectionRequest connectionRequest = triple.first; - boolean createFeedIntakeJob = triple.second; - listener.registerFeedEventSubscriber(eventSubscriber); - subscriberRegistered = true; - if (createFeedIntakeJob) { - EntityId feedId = connectionRequest.getFeedJointKey().getFeedId(); - Feed primaryFeed = - MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()); - Pair<JobSpecification, IAdapterFactory> pair = - FeedOperations.buildFeedIntakeJobSpec(primaryFeed, metadataProvider, policyAccessor); - // adapter configuration are valid at this stage - // register the feed joints (these are auto-de-registered) - int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length; - for (IFeedJoint fj : triple.third) { - listener.registerFeedJoint(fj, numOfPrividers); - } - FeedIntakeInfo activeJob = new FeedIntakeInfo(null, ActivityState.ACTIVE, feed.getFeedId(), - triple.third.get(0), pair.first); - pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob); - JobUtils.runJob(hcc, pair.first, false); - eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED); - } else { - for (IFeedJoint fj : triple.third) { - listener.registerFeedJoint(fj, 0); - } - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED); - if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) { - eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call - } + MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, + feedConnections); + // Prepare policy + List<IDataset> datasets = new ArrayList<>(); + for (FeedConnection connection : feedConnections) { + datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, connection.getDataverseName(), + connection.getDatasetName())); + } + + org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = + FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections, + compilationProvider, storageComponentProvider, qtFactory, hcc); + + JobSpecification feedJob = jobInfo.getLeft(); + listener = new FeedEventsListener(entityId, datasets, jobInfo.getRight().getLocations()); + ActiveJobNotificationHandler.INSTANCE.registerListener(listener); + IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED); + feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + JobUtils.runJob(hcc, feedJob, + Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); + eventSubscriber.sync(); + LOGGER.log(Level.INFO, "Submitted"); } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); + abort(e, e, mdTxnCtx); + if (listener != null) { + ActiveJobNotificationHandler.INSTANCE.unregisterListener(listener); } throw e; } finally { - MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + feedName); - if (subscriberRegistered) { - listener.deregisterFeedEventSubscriber(eventSubscriber); - } + MetadataLockManager.INSTANCE.startFeedEnd(dataverseName, dataverseName + "." + feedName, feedConnections); } } - /** - * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean - * flag indicating if feed intake job needs to be started (source primary feed not found to be active). - * - * @param dataverse - * @param feed - * @param dataset - * @param feedPolicy - * @param mdTxnCtx - * @return - * @throws CompilationException - */ - protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse, - Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx) - throws CompilationException { - IFeedJoint sourceFeedJoint; - FeedConnectionRequest request; - List<String> functionsToApply = new ArrayList<>(); - boolean needIntakeJob = false; - List<IFeedJoint> jointsToRegister = new ArrayList<>(); - FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), dataset); - FeedRuntimeType connectionLocation; - FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx); - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverse, feed.getFeedName()); + private void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { + StopFeedStatement sfst = (StopFeedStatement) stmt; + String dataverseName = getActiveDataverse(sfst.getDataverseName()); + String feedName = sfst.getFeedName().getValue(); + EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + // Obtain runtime info from ActiveListener FeedEventsListener listener = - (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); + (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId); if (listener == null) { - throw new CompilationException("Feed Listener is not registered"); + throw new AlgebricksException("Feed " + feedName + " is not started."); } - - boolean isFeedJointAvailable = listener.isFeedJointAvailable(feedJointKey); - if (!isFeedJointAvailable) { - sourceFeedJoint = listener.getAvailableFeedJoint(feedJointKey); - if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable. - connectionLocation = FeedRuntimeType.INTAKE; - EntityId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId - Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getEntityName()); - FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<>()); - sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation, - FeedJointType.INTAKE, connectionId); - jointsToRegister.add(sourceFeedJoint); - needIntakeJob = true; - } else { - connectionLocation = sourceFeedJoint.getConnectionLocation(); - } - - String[] functions = feedJointKey.getStringRep() - .substring(sourceFeedJoint.getFeedJointKey().getStringRep().length()).trim().split(":"); - for (String f : functions) { - if (f.trim().length() > 0) { - functionsToApply.add(f); - } - } - // register the compute feed point that represents the final output from the collection of - // functions that will be applied. - if (!functionsToApply.isEmpty()) { - FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply); - IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(), - FeedRuntimeType.COMPUTE, FeedJointType.COMPUTE, connectionId); - jointsToRegister.add(computeFeedJoint); - } - } else { - sourceFeedJoint = listener.getFeedJoint(feedJointKey); - connectionLocation = sourceFeedJoint.getConnectionLocation(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Feed joint " + sourceFeedJoint + " is available! need not apply any further computation"); - } - } - - request = new FeedConnectionRequest(sourceFeedJoint.getFeedJointKey(), connectionLocation, functionsToApply, - dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId()); - - sourceFeedJoint.addConnectionRequest(request); - return new Triple<>(request, needIntakeJob, jointsToRegister); - } - - /* - * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are - * available at this feed joint. - */ - protected FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException { - Feed sourceFeed = feed; - List<String> appliedFunctions = new ArrayList<>(); - while (sourceFeed.getFeedType().equals(IFeed.FeedType.SECONDARY)) { - if (sourceFeed.getAppliedFunction() != null) { - appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName()); - } - Feed parentFeed = - MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(), sourceFeed.getSourceFeedName()); - sourceFeed = parentFeed; - } - - if (sourceFeed.getAppliedFunction() != null) { - appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName()); + IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STOPPED); + // Transaction + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + MetadataLockManager.INSTANCE.StopFeedBegin(dataverseName, feedName); + try { + // validate + FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx); + // Construct ActiveMessage + for (int i = 0; i < listener.getSources().length; i++) { + String intakeLocation = listener.getSources()[i]; + FeedOperations.SendStopMessageToNode(feedId, intakeLocation, i); + } + eventSubscriber.sync(); + } catch (Exception e) { + abort(e, e, mdTxnCtx); + throw e; + } finally { + MetadataLockManager.INSTANCE.StopFeedEnd(dataverseName, feedName); } - - return new FeedJointKey(sourceFeed.getFeedId(), appliedFunctions); } - protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt; + private void handleConnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { + FeedConnection fc; + ConnectFeedStatement cfs = (ConnectFeedStatement) stmt; String dataverseName = getActiveDataverse(cfs.getDataverseName()); + String feedName = cfs.getFeedName(); String datasetName = cfs.getDatasetName().getValue(); + String policyName = cfs.getPolicy(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - - FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx); - Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName()); - FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue()); - IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); - FeedEventsListener listener = - (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); - if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) { - throw new CompilationException("Feed " + feed.getFeedId().getEntityName() - + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); - } - listener.registerFeedEventSubscriber(eventSubscriber); - MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + cfs.getFeedName()); + // validation + Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, + metadataProvider.getMetadataTxnContext()); + ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(), + ExternalDataConstants.KEY_TYPE_NAME); + List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions(); + // Transaction handling + MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, + dataverseName + "." + feedName); try { - Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), - dataverseName, cfs.getDatasetName().getValue()); - if (dataset == null) { - throw new CompilationException( - "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName); - } - Pair<JobSpecification, Boolean> specDisconnectType = - FeedOperations.buildDisconnectFeedJobSpec(connectionId); - JobSpecification jobSpec = specDisconnectType.first; + fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, + feedName, datasetName); + if (fc != null) { + throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName); + } + fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, + outputType.toString()); + MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - JobUtils.runJob(hcc, jobSpec, true); - eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); } catch (Exception e) { - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } + abort(e, e, mdTxnCtx); throw e; } finally { - MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + cfs.getFeedName()); + MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName, + dataverseName + "." + feedName); } } - protected void handleSubscribeFeedStatement(MetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Subscriber Feed Statement :" + stmt); - } - + protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { + DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt; + String dataverseName = getActiveDataverse(cfs.getDataverseName()); + String datasetName = cfs.getDatasetName().getValue(); + String feedName = cfs.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - metadataProvider.setWriteTransaction(true); - metadataProvider.disableBlockingOperator(); - SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt; - bfs.initialize(metadataProvider.getMetadataTxnContext()); - - CompiledSubscribeFeedStatement csfs = - new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), bfs.getVarCounter()); - metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE); - metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy()); - metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS, - StringUtils.join(bfs.getLocations(), ',')); - - JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, bfs.getQuery(), csfs); - FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), - bfs.getSubscriptionRequest().getTargetDataset()); - String dataverse = feedConnectionId.getFeedId().getDataverse(); - String dataset = feedConnectionId.getDatasetName(); - MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, - dataverse + "." + feedConnectionId.getFeedId().getEntityName()); + MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName, + dataverseName + "." + cfs.getFeedName()); try { - JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(jobSpec, feedConnectionId, - bfs.getSubscriptionRequest().getPolicyParameters()); - FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, bfs.getPolicy()); - if (policy == null) { - policy = BuiltinFeedPolicies.getFeedPolicy(bfs.getPolicy()); - if (policy == null) { - throw new AlgebricksException("Unknown feed policy:" + bfs.getPolicy()); - } - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; - - if (jobSpec != null) { - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId()); - FeedConnectJobInfo activeJob = new FeedConnectJobInfo( - bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE, - new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset), - listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties()); - alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob); - JobUtils.runJob(hcc, alteredJobSpec, false); - } - + FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx); + FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); + FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), + dataverseName, feedName, datasetName); + if (fc == null) { + throw new CompilationException("Feed " + feedName + " is currently not connected to " + + cfs.getDatasetName().getValue() + ". Invalid operation!"); + } + MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName); } catch (Exception e) { - LOGGER.log(Level.WARNING, e.getMessage(), e); - if (bActiveTxn) { - abort(e, e, mdTxnCtx); - } + abort(e, e, mdTxnCtx); throw e; } finally { - MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset, - dataverse + "." + feedConnectionId.getFeedId().getEntityName()); + MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName, + dataverseName + "." + cfs.getFeedName()); } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java new file mode 100644 index 0000000..f8b5496 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.utils; + +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.feed.management.FeedConnectionId; +import org.apache.asterix.external.feed.management.FeedConnectionRequest; +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; +import org.apache.asterix.external.feed.watch.FeedActivityDetails; +import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; +import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; +import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; +import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; +import org.apache.asterix.external.util.FeedConstants; +import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; +import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.statement.DataverseDecl; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.util.FunctionUtil; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.metadata.entities.FeedPolicyEntity; +import org.apache.asterix.metadata.feeds.FeedMetadataUtil; +import org.apache.asterix.metadata.feeds.LocationConstraint; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.translator.CompiledStatements; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SessionConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.constraints.PartitionConstraintHelper; +import org.apache.hyracks.api.constraints.expressions.ConstantExpression; +import org.apache.hyracks.api.constraints.expressions.ConstraintExpression; +import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; +import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; +import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; +import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; +import org.apache.hyracks.api.dataflow.IConnectorDescriptor; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; + +/** + * Provides helper method(s) for creating JobSpec for operations on a feed. + */ +public class FeedOperations { + + private FeedOperations() { + } + + private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed, + MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE); + IAdapterFactory adapterFactory; + IOperatorDescriptor feedIngestor; + AlgebricksPartitionConstraint ingesterPc; + Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = + metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor); + feedIngestor = t.first; + ingesterPc = t.second; + adapterFactory = t.third; + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc); + NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc); + spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0); + spec.addRoot(nullSink); + return Pair.of(spec, adapterFactory); + } + + public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws AsterixException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); + Set<String> nodes = new TreeSet<>(); + for (String node : allCluster.getLocations()) { + nodes.add(node); + } + AlgebricksAbsolutePartitionConstraint locations = + new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); + FileSplit[] feedLogFileSplits = + FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); + org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC = + StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); + FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, spC.second); + spec.addRoot(frod); + return spec; + } + + private static JobSpecification getConnectionJob(SessionConfig sessionConfig, MetadataProvider metadataProvider, + FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider, + IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, + IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException { + DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedConnection.getDataverseName())); + FeedConnectionRequest fcr = + new FeedConnectionRequest(FeedRuntimeType.INTAKE, feedConnection.getAppliedFunctions(), + feedConnection.getDatasetName(), feedConnection.getPolicyName(), feedConnection.getFeedId()); + SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, fcr); + subscribeStmt.initialize(metadataProvider.getMetadataTxnContext()); + List<Statement> statements = new ArrayList<>(); + statements.add(dataverseDecl); + statements.add(subscribeStmt); + IStatementExecutor translator = + qtFactory.create(statements, sessionConfig, compilationProvider, storageComponentProvider); + // configure the metadata provider + metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE); + metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy()); + metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS, + StringUtils.join(subscribeStmt.getLocations(), ',')); + + CompiledStatements.CompiledSubscribeFeedStatement csfs = new CompiledStatements.CompiledSubscribeFeedStatement( + subscribeStmt.getSubscriptionRequest(), subscribeStmt.getVarCounter()); + return translator.rewriteCompileQuery(hcc, metadataProvider, subscribeStmt.getQuery(), csfs); + } + + private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed, + JobSpecification intakeJob, List<JobSpecification> jobsList, List<FeedConnection> feedConnections, + String[] intakeLocations) throws AlgebricksException, HyracksDataException { + JobSpecification jobSpec = new JobSpecification(intakeJob.getFrameSize()); + + // copy ingestor + FeedIntakeOperatorDescriptor firstOp = + (FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0)); + FeedIntakeOperatorDescriptor ingestionOp; + if (firstOp.getAdaptorFactory() == null) { + ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryName(), + firstOp.getAdaptorFactoryClassName(), firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(), + firstOp.getOutputRecordDescriptors()[0]); + } else { + ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(), + firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(), + firstOp.getOutputRecordDescriptors()[0]); + } + // create replicator + ReplicateOperatorDescriptor replicateOp = + new ReplicateOperatorDescriptor(jobSpec, ingestionOp.getOutputRecordDescriptors()[0], jobsList.size()); + jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), ingestionOp, 0, replicateOp, 0); + PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, ingestionOp, intakeLocations); + PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, replicateOp, intakeLocations); + // Loop over the jobs to copy operators and connections + Map<OperatorDescriptorId, OperatorDescriptorId> operatorIdMapping = new HashMap<>(); + Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>(); + Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); + Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); + List<JobId> jobIds = new ArrayList<>(); + + for (int iter1 = 0; iter1 < jobsList.size(); iter1++) { + FeedConnection curFeedConnection = feedConnections.get(iter1); + JobSpecification subJob = jobsList.get(iter1); + operatorIdMapping.clear(); + Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap(); + + FeedPolicyEntity feedPolicyEntity = + FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(), + curFeedConnection.getPolicyName(), metadataProvider.getMetadataTxnContext()); + + for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) { + IOperatorDescriptor opDesc = entry.getValue(); + OperatorDescriptorId oldId = opDesc.getOperatorId(); + OperatorDescriptorId opId; + if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor + && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) { + String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName(); + FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(jobSpec, + new FeedConnectionId(ingestionOp.getEntityId(), + feedConnections.get(iter1).getDatasetName()), + opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, false, operandId); + opId = metaOp.getOperatorId(); + opDesc.setOperatorId(opId); + } else { + if (opDesc instanceof AlgebricksMetaOperatorDescriptor) { + AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc; + for (IPushRuntimeFactory runtimeFactory : algOp.getPipeline().getRuntimeFactories()) { + if (runtimeFactory instanceof StreamSelectRuntimeFactory) { + ((StreamSelectRuntimeFactory) runtimeFactory).retainMissing(true, 0); + } + } + } + opId = jobSpec.createOperatorDescriptorId(opDesc); + } + operatorIdMapping.put(oldId, opId); + } + + // copy connectors + connectorIdMapping.clear(); + for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : subJob.getConnectorMap().entrySet()) { + IConnectorDescriptor connDesc = entry.getValue(); + ConnectorDescriptorId newConnId; + if (entry.getKey().getId() == 0) { + continue; + } + if (connDesc instanceof MToNPartitioningConnectorDescriptor) { + MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc; + connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec, + m2nConn.getTuplePartitionComputerFactory()); + newConnId = connDesc.getConnectorId(); + } else { + newConnId = jobSpec.createConnectorDescriptor(connDesc); + } + connectorIdMapping.put(entry.getKey(), newConnId); + } + + // make connections between operators + for (Entry<ConnectorDescriptorId, + Pair<Pair<IOperatorDescriptor, Integer>,Pair<IOperatorDescriptor, Integer>>> entry : + subJob.getConnectorOperatorMap().entrySet()) { + ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); + IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); + Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft(); + Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight(); + IOperatorDescriptor leftOpDesc = jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId()); + IOperatorDescriptor rightOpDesc = jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId()); + if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) { + jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc, + leftOp.getRight()); + jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), leftOpDesc, leftOp.getRight(), + rightOpDesc, rightOp.getRight()); + } else { + jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight()); + } + } + + // prepare for setting partition constraints + operatorLocations.clear(); + operatorCounts.clear(); + + for (Constraint constraint : subJob.getUserConstraints()) { + LValueConstraintExpression lexpr = constraint.getLValue(); + ConstraintExpression cexpr = constraint.getRValue(); + OperatorDescriptorId opId; + switch (lexpr.getTag()) { + case PARTITION_COUNT: + opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId(); + if (opId.getId() == 0) { + continue; + } + operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue()); + break; + case PARTITION_LOCATION: + opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId(); + if (opId.getId() == 0) { + continue; + } + IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId)); + List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId()); + if (locations == null) { + locations = new ArrayList<>(); + operatorLocations.put(opDesc.getOperatorId(), locations); + } + String location = (String) ((ConstantExpression) cexpr).getValue(); + LocationConstraint lc = + new LocationConstraint(location, ((PartitionLocationExpression) lexpr).getPartition()); + locations.add(lc); + break; + default: + break; + } + } + + // set absolute location constraints + for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) { + IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey()); + // why do we need to sort? + Collections.sort(entry.getValue(), (LocationConstraint o1, LocationConstraint o2) -> { + return o1.partition - o2.partition; + }); + String[] locations = new String[entry.getValue().size()]; + for (int j = 0; j < locations.length; ++j) { + locations[j] = entry.getValue().get(j).location; + } + PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, opDesc, locations); + } + + // set count constraints + for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) { + IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(entry.getKey()); + if (!operatorLocations.keySet().contains(entry.getKey())) { + PartitionConstraintHelper.addPartitionCountConstraint(jobSpec, opDesc, entry.getValue()); + } + } + // roots + for (OperatorDescriptorId root : subJob.getRoots()) { + jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root))); + } + jobIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getJobId()); + } + + // jobEventListenerFactory + jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(jobIds, true)); + // useConnectorSchedulingPolicy + jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling()); + // connectorAssignmentPolicy + jobSpec.setConnectorPolicyAssignmentPolicy(jobsList.get(0).getConnectorPolicyAssignmentPolicy()); + return jobSpec; + } + + public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob( + SessionConfig sessionConfig, MetadataProvider metadataProvider, Feed feed, + List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider, + IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, + IHyracksClientConnection hcc) throws Exception { + FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>()); + // TODO: Change the default Datasource to use all possible partitions + Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa); + //TODO: Add feed policy accessor + List<JobSpecification> jobsList = new ArrayList<>(); + // Construct the ingestion Job + JobSpecification intakeJob = intakeInfo.getLeft(); + IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight(); + String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations(); + // Add connection job + for (FeedConnection feedConnection : feedConnections) { + JobSpecification connectionJob = getConnectionJob(sessionConfig, metadataProvider, feedConnection, + ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc); + jobsList.add(connectionJob); + } + return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, intakeJob, jobsList, feedConnections, + ingestionLocations), intakeInfo.getRight().getPartitionConstraint()); + } + + public static void SendStopMessageToNode(EntityId feedId, String intakeNodeLocation, Integer partition) + throws Exception { + ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "SRC", + new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); + SendActiveMessage(stopFeedMessage, intakeNodeLocation); + } + + private static void SendActiveMessage(ActiveManagerMessage activeManagerMessage, String nodeId) throws Exception { + ICCMessageBroker messageBroker = + (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker(); + messageBroker.sendApplicationMessageToNC(activeManagerMessage, nodeId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm index 390a955..cdaaf1b 100644 --- a/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm +++ b/asterixdb/asterix-app/src/test/resources/metadata/results/basic/metadata_dataset/metadata_dataset.1.adm @@ -1,13 +1,14 @@ -{ "DataverseName": "Metadata", "DatasetName": "CompactionPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 13, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Dataset", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 2, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "DatasourceAdapter", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 8, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Datatype", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 3, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Dataverse", "DatatypeDataverseName": "Metadata", "DatatypeName": "DataverseRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ] ], "PrimaryKey": [ [ "DataverseName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 1, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "ExternalFile", "DatatypeDataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 14, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Feed", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 10, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "FeedPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 12, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Function", "DatatypeDataverseName": "Metadata", "DatatypeName": "FunctionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 7, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Index", "DatatypeDataverseName": "Metadata", "DatatypeName": "IndexRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 4, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Library", "DatatypeDataverseName": "Metadata", "DatatypeName": "LibraryRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 9, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Node", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "NodeName" ] ], "PrimaryKey": [ [ "NodeName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 5, "PendingOp": 0 } -{ "DataverseName": "Metadata", "DatasetName": "Nodegroup", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "GroupName" ] ], "PrimaryKey": [ [ "GroupName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Tue Jun 21 15:54:20 PDT 2016", "DatasetId": 6, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "CompactionPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "CompactionPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "CompactionPolicy" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 13, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Dataset", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasetRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 2, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "DatasourceAdapter", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatasourceAdapterRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 8, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Datatype", "DatatypeDataverseName": "Metadata", "DatatypeName": "DatatypeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatatypeName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 3, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Dataverse", "DatatypeDataverseName": "Metadata", "DatatypeName": "DataverseRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ] ], "PrimaryKey": [ [ "DataverseName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 1, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "ExternalFile", "DatatypeDataverseName": "Metadata", "DatatypeName": "ExternalFileRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "FileNumber" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 14, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Feed", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 10, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "FeedConnection", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedConnectionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "FeedName" ], [ "DatasetName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "FeedName" ], [ "DatasetName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 11, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "FeedPolicy", "DatatypeDataverseName": "Metadata", "DatatypeName": "FeedPolicyRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "PolicyName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 12, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Function", "DatatypeDataverseName": "Metadata", "DatatypeName": "FunctionRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ], [ "Arity" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 7, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Index", "DatatypeDataverseName": "Metadata", "DatatypeName": "IndexRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "DatasetName" ], [ "IndexName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 4, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Library", "DatatypeDataverseName": "Metadata", "DatatypeName": "LibraryRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "DataverseName" ], [ "Name" ] ], "PrimaryKey": [ [ "DataverseName" ], [ "Name" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 9, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Node", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "NodeName" ] ], "PrimaryKey": [ [ "NodeName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 5, "PendingOp": 0 } +{ "DataverseName": "Metadata", "DatasetName": "Nodegroup", "DatatypeDataverseName": "Metadata", "DatatypeName": "NodeGroupRecordType", "DatasetType": "INTERNAL", "GroupName": "MetadataGroup", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "GroupName" ] ], "PrimaryKey": [ [ "GroupName" ] ], "Autogenerated": false }, "Hints": {{ }}, "Timestamp": "Fri Oct 21 10:29:21 PDT 2016", "DatasetId": 6, "PendingOp": 0 }
