http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 5f478c3..ab8c8f7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -28,7 +28,6 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -45,6 +44,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri */ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + private static final String FEED_EXTENSION_NAME = "Feed"; + private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName()); @@ -53,29 +54,23 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator private final EntityId feedId; private final FeedPolicyAccessor policyAccessor; - + private final ARecordType adapterOutputType; /** The adaptor factory that is used to create an instance of the feed adaptor **/ private IAdapterFactory adaptorFactory; - /** The library that contains the adapter in use. **/ private String adaptorLibraryName; - /** * The adapter factory class that is used to create an instance of the feed adapter. * This value is used only in the case of external adapters. **/ private String adaptorFactoryClassName; - /** The configuration parameters associated with the adapter. **/ private Map<String, String> adaptorConfiguration; - private final ARecordType adapterOutputType; - public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) { super(spec, 0, 1); - this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), - primaryFeed.getFeedName()); + this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName()); this.adaptorFactory = adapterFactory; this.adapterOutputType = adapterOutputType; this.policyAccessor = policyAccessor; @@ -86,8 +81,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) { super(spec, 0, 1); - this.feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), - primaryFeed.getFeedName()); + this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName()); this.adaptorFactoryClassName = adapterFactoryClassName; this.adaptorLibraryName = adapterLibraryName; this.adaptorConfiguration = primaryFeed.getAdapterConfiguration(); @@ -108,8 +102,8 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException { IAdapterFactory adapterFactory; - IAppRuntimeContext runtimeCtx = - (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); + IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext() + .getApplicationContext().getApplicationObject(); ILibraryManager libraryManager = runtimeCtx.getLibraryManager(); ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName); if (classLoader != null) { @@ -130,8 +124,32 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator return adapterFactory; } - public EntityId getFeedId() { + public EntityId getEntityId() { return feedId; } + public IAdapterFactory getAdaptorFactory() { + return this.adaptorFactory; + } + + public void setAdaptorFactory(IAdapterFactory factory) { + this.adaptorFactory = factory; + } + + public ARecordType getAdapterOutputType() { + return this.adapterOutputType; + } + + public FeedPolicyAccessor getPolicyAccessor() { + return this.policyAccessor; + } + + public String getAdaptorLibraryName() { + return this.adaptorLibraryName; + } + + public String getAdaptorFactoryClassName() { + return this.adaptorFactoryClassName; + } + }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index f58e9e5..99fff19 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -18,84 +18,58 @@ */ package org.apache.asterix.external.operators; -import org.apache.asterix.active.ActiveManager; import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.ActiveSourceOperatorNodePushable; import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.dataset.adapter.FeedAdapter; -import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager; -import org.apache.asterix.external.feed.runtime.IngestionRuntime; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import org.apache.hyracks.api.util.HyracksConstants; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; /** * The runtime for @see{FeedIntakeOperationDescriptor}. * Provides the core functionality to set up the artifacts for ingestion of a feed. * The artifacts are lazily activated when a feed receives a subscription request. */ -public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable { +public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable { - private final EntityId feedId; private final int partition; - private final IHyracksTaskContext ctx; private final IAdapterFactory adapterFactory; private final FeedIntakeOperatorDescriptor opDesc; + private volatile AdapterRuntimeManager adapterRuntimeManager; public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory, int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) { + super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); this.opDesc = feedIntakeOperatorDescriptor; this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0); - this.ctx = ctx; - this.feedId = feedId; this.partition = partition; this.adapterFactory = adapterFactory; } @Override - public void initialize() throws HyracksDataException { - ActiveManager feedManager = (ActiveManager) ((IAppRuntimeContext) ctx.getJobletContext() - .getApplicationContext().getApplicationObject()).getActiveManager(); - AdapterRuntimeManager adapterRuntimeManager = null; - DistributeFeedFrameWriter frameDistributor = null; - IngestionRuntime ingestionRuntime = null; - boolean open = false; + protected void start() throws HyracksDataException, InterruptedException { + writer.open(); try { Thread.currentThread().setName("Intake Thread"); - // create the adapter FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition); - // create the distributor - frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition); - // create adapter runtime manager - adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition); - // create and register the runtime - ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition); - ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx); - feedManager.registerRuntime(ingestionRuntime); - // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests. - ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null); - // open the distributor - open = true; - frameDistributor.open(); - // wait until ingestion is over + adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition); + TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); + adapterRuntimeManager.start(); synchronized (adapterRuntimeManager) { while (!adapterRuntimeManager.isDone()) { adapterRuntimeManager.wait(); } } - // The ingestion is over. we need to remove the runtime from the manager - feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId()); - // If there was a failure, we need to throw an exception if (adapterRuntimeManager.isFailed()) { throw new RuntimeDataException( ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION); @@ -106,15 +80,16 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location. * The surviving intake partitions must continue to live and receive data from the external source. */ - if (ingestionRuntime != null) { - ingestionRuntime.terminate(); - feedManager.deregisterRuntime(ingestionRuntime.getRuntimeId()); - } throw new HyracksDataException(ie); } finally { - if (open) { - frameDistributor.close(); - } + writer.close(); + } + } + + @Override + protected void abort() throws HyracksDataException, InterruptedException { + if (adapterRuntimeManager != null) { + adapterRuntimeManager.stop(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java deleted file mode 100644 index 61451b1..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.operators; - -import org.apache.asterix.active.IActiveMessage; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; - -/** - * @deprecated - * Sends a control message to the registered message queue for feed specified by its feedId. - * For messaging, use IMessageBroker interfaces - */ -@Deprecated -public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - - private static final long serialVersionUID = 1L; - - private final FeedConnectionId connectionId; - private final IActiveMessage feedMessage; - - public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId, - IActiveMessage feedMessage) { - super(spec, 0, 1); - this.connectionId = connectionId; - this.feedMessage = feedMessage; - } - - @Override - public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java deleted file mode 100644 index b273325..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.operators; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveManager; -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveMessage; -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.feed.api.ISubscribableRuntime; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.message.EndFeedMessage; -import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager; -import org.apache.asterix.external.feed.runtime.CollectionRuntime; -import org.apache.asterix.external.feed.runtime.IngestionRuntime; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; - -/** - * @deprecated - * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating - * a feed message to the local feed manager on the host node controller. - * For messages, use IMessageBroker interfaces - * @see FeedMessageOperatorDescriptor - * IFeedMessage - * IFeedManager - */ -@Deprecated -public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable { - - private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName()); - - private final FeedConnectionId connectionId; - private final IActiveMessage message; - private final ActiveManager feedManager; - private final int partition; - - public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId, - IActiveMessage feedMessage, int partition) { - this.connectionId = connectionId; - this.message = feedMessage; - this.partition = partition; - IAppRuntimeContext runtimeCtx = - (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); - this.feedManager = (ActiveManager) runtimeCtx.getActiveManager(); - } - - @Override - public void initialize() throws HyracksDataException { - try { - writer.open(); - switch (message.getMessageType()) { - case END: - EndFeedMessage endFeedMessage = (EndFeedMessage) message; - switch (endFeedMessage.getEndMessageType()) { - case DISCONNECT_FEED: - hanldeDisconnectFeedTypeMessage(endFeedMessage); - break; - case DISCONTINUE_SOURCE: - handleDiscontinueFeedTypeMessage(endFeedMessage); - break; - default: - break; - } - break; - default: - break; - } - } catch (Exception e) { - throw new HyracksDataException(e); - } finally { - writer.close(); - } - } - - private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception { - EntityId sourceFeedId = endFeedMessage.getSourceFeedId(); - ActiveRuntimeId subscribableRuntimeId = - new ActiveRuntimeId(sourceFeedId, FeedRuntimeType.INTAKE.toString(), partition); - ISubscribableRuntime feedRuntime = (ISubscribableRuntime) feedManager.getRuntime(subscribableRuntimeId); - AdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager(); - adapterRuntimeManager.stop(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Stopped Adapter " + adapterRuntimeManager); - } - } - - private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId()); - } - ActiveRuntimeId runtimeId; - FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType(); - if (endFeedMessage.isCompleteDisconnection()) { - // subscribableRuntimeType represents the location at which the feed connection receives - // data - FeedRuntimeType runtimeType; - switch (subscribableRuntimeType) { - case INTAKE: - runtimeType = FeedRuntimeType.COLLECT; - break; - case COMPUTE: - runtimeType = FeedRuntimeType.COMPUTE_COLLECT; - break; - default: - throw new RuntimeDataException( - ErrorCode.OPERATORS_FEED_MSG_OPERATOR_NODE_PUSHABLE_INVALID_SUBSCRIBABLE_RUNTIME, - subscribableRuntimeType); - } - - runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), runtimeType.toString(), partition); - CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId); - if (feedRuntime != null) { - feedRuntime.getSourceRuntime().unsubscribe(feedRuntime); - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId()); - } - } else { - // subscribaleRuntimeType represents the location for data hand-off in presence of - // subscribers - switch (subscribableRuntimeType) { - case INTAKE: - // illegal state as data hand-off from one feed to another does not happen at - // intake - throw new RuntimeDataException( - ErrorCode.OPERATORS_FEED_MSG_OPERATOR_NODE_PUSHABLE_INVALID_SUBSCRIBABLE_RUNTIME, - subscribableRuntimeType); - case COMPUTE: - // feed could be primary or secondary, doesn't matter - ActiveRuntimeId feedSubscribableRuntimeId = new ActiveRuntimeId(connectionId.getFeedId(), - FeedRuntimeType.COMPUTE.toString(), partition); - ISubscribableRuntime feedRuntime = - (ISubscribableRuntime) feedManager.getRuntime(feedSubscribableRuntimeId); - runtimeId = new ActiveRuntimeId(endFeedMessage.getSourceFeedId(), - FeedRuntimeType.COMPUTE_COLLECT.toString(), partition); - CollectionRuntime feedCollectionRuntime = (CollectionRuntime) feedManager.getRuntime(runtimeId); - feedRuntime.unsubscribe(feedCollectionRuntime); - break; - default: - break; - } - - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Unsubscribed from feed :" + connectionId); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 5ec0399..b794ee1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -44,6 +44,8 @@ import org.apache.hyracks.util.IntSerDeUtils; public class FeedUtils { + public static final String FEED_EXTENSION_NAME = "Feed"; + public enum JobType { INTAKE, FEED_CONNECT http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java index 171d271..d407b8a 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -31,6 +31,7 @@ import org.apache.asterix.common.memory.ConcurrentFramePool; import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; +import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; @@ -72,7 +73,7 @@ public class InputHandlerTest extends TestCase { private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException { FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class); - EntityId feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, DATAVERSE, FEED); + EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, DATAVERSE, FEED); FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET); ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0); return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql index 39a6272..40aec53 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql @@ -30,3 +30,5 @@ use dataverse externallibtest; set wait-for-completion-feed "true"; connect feed TestTypedAdapterFeed to dataset TweetsTestAdapter; + +start feed TestTypedAdapterFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql index dbe3cfa..5f3d322 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 @@ -48,8 +48,7 @@ create feed TweetFeed using localfs (("type-name"="TweetInputType"), ("path"="asterix_nc1://../../../../../../asterix-app/data/twitter/obamatweets.adm"), -("format"="adm")) -apply function testlib#parseTweet; +("format"="adm")); create dataset TweetsFeedIngest(TweetOutputType) primary key id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql index d5a6f58..9642992 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 @@ -29,4 +29,7 @@ use dataverse externallibtest; set wait-for-completion-feed "true"; -connect feed TweetFeed to dataset TweetsFeedIngest; +connect feed TweetFeed to dataset TweetsFeedIngest +apply function testlib#parseTweet; + +start feed TweetFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql index e4e1b45..8879fa8 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.4.query.aql @@ -19,8 +19,8 @@ /* * Description : Create a feed dataset that uses the feed simulator adapter. The feed simulator simulates feed from a file in the local fs. - Associate with the feed an external user-defined function. The UDF - finds topics in each tweet. A topic is identified by a #. + Associate with the feed an external user-defined function. The UDF + finds topics in each tweet. A topic is identified by a #. Begin ingestion and apply external user defined function * Expected Res : Success * Date : 23rd Apr 2013 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql index 58dea6b..1e05e37 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.02.ddl.aql @@ -25,3 +25,5 @@ use dataverse twitter; connect feed MessageFeed to dataset ds_tweet; + +start feed MessageFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql index cf520ca..fe30266 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql @@ -33,3 +33,4 @@ create feed TweetFeed using localfs ); set wait-for-completion-feed "true"; connect feed TweetFeed to dataset ds_tweet; +start feed TweetFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql index 860b8ed..8cb5e07 100644 --- a/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql +++ b/asterixdb/asterix-installer/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.08.ddl.aql @@ -24,4 +24,4 @@ */ use dataverse twitter; set wait-for-completion-feed "false"; -connect feed TweetFeed to dataset ds_tweet; +start feed TweetFeed; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql b/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql index 7faf013..d9b1230 100644 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql +++ b/asterixdb/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.3.update.aql @@ -24,4 +24,5 @@ use dataverse KeyVerse; set wait-for-completion-feed "true"; -connect feed KVChangeStream to dataset KVStore; \ No newline at end of file +connect feed KVChangeStream to dataset KVStore; +start feed KVChangeStream; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm index c31da8b..2975e63 100644 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm +++ b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm @@ -1 +1 @@ -804 \ No newline at end of file +788 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java index 1d0d962..4596054 100644 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java +++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java @@ -32,6 +32,7 @@ import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.watch.FeedActivityDetails; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.lang.aql.parser.AQLParserFactory; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.IParserFactory; @@ -44,6 +45,7 @@ import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -54,14 +56,14 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; */ public class SubscribeFeedStatement implements Statement { + public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; + private static final Integer INSERT_STATEMENT_POS = 3; private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName()); - private final FeedConnectionRequest connectionRequest; - private Query query; private final int varCounter; private final String[] locations; - - public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; + private final FeedConnectionRequest connectionRequest; private final IParserFactory parserFactory = new AQLParserFactory(); + private Query query; public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) { this.connectionRequest = subscriptionRequest; @@ -71,7 +73,7 @@ public class SubscribeFeedStatement implements Statement { public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException { this.query = new Query(false); - EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId(); + EntityId sourceFeedId = connectionRequest.getReceivingFeedId(); Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(), connectionRequest.getReceivingFeedId().getEntityName()); @@ -80,18 +82,6 @@ public class SubscribeFeedStatement implements Statement { } String feedOutputType = getOutputType(mdTxnCtx); - FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction(); - Function function = null; - if (appliedFunction != null) { - function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); - if (function == null) { - throw new MetadataException(" Unknown function " + appliedFunction); - } else if (function.getParams().size() > 1) { - throw new MetadataException( - " Incompatible function: " + appliedFunction + " Number if arguments must be 1"); - } - } - StringBuilder builder = new StringBuilder(); builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n"); builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n"); @@ -105,14 +95,15 @@ public class SubscribeFeedStatement implements Statement { + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'" + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")"); - List<String> functionsToApply = connectionRequest.getFunctionsToApply(); + List<FunctionSignature> functionsToApply = connectionRequest.getFunctionsToApply(); if ((functionsToApply != null) && functionsToApply.isEmpty()) { builder.append(" return $x"); } else { + Function function; String rValueName = "x"; String lValueName = "y"; int variableIndex = 0; - for (String functionName : functionsToApply) { + for (FunctionSignature appliedFunction : functionsToApply) { function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); variableIndex++; switch (function.getLanguage().toUpperCase()) { @@ -122,8 +113,8 @@ public class SubscribeFeedStatement implements Statement { builder.append("\n"); break; case Function.LANGUAGE_JAVA: - builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$" - + rValueName + ")"); + builder.append(" let " + "$" + lValueName + variableIndex + ":=" + function.getName() + "(" + + "$" + rValueName + ")"); rValueName = lValueName + variableIndex; break; } @@ -141,7 +132,7 @@ public class SubscribeFeedStatement implements Statement { List<Statement> statements; try { statements = parser.parse(); - query = ((InsertStatement) statements.get(3)).getQuery(); + query = ((InsertStatement) statements.get(INSERT_STATEMENT_POS)).getQuery(); } catch (CompilationException pe) { throw new MetadataException(pe); } @@ -179,21 +170,13 @@ public class SubscribeFeedStatement implements Statement { } private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException { - String outputType = null; + String outputType; EntityId feedId = connectionRequest.getReceivingFeedId(); Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()); - FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters()); try { - switch (feed.getFeedType()) { - case PRIMARY: - outputType = FeedMetadataUtil - .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME) - .getTypeName(); - break; - case SECONDARY: - outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx); - break; - } + outputType = FeedMetadataUtil + .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME) + .getTypeName(); return outputType; } catch (AlgebricksException | RemoteException | ACIDException ae) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj index e5af86f..f4cdfb8 100644 --- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj +++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj @@ -98,10 +98,10 @@ import org.apache.asterix.lang.common.statement.ConnectFeedStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; import org.apache.asterix.lang.common.statement.CreateFeedStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -702,26 +702,15 @@ CreateFeedStatement FeedSpecification() throws ParseException: boolean ifNotExists = false; String adapterName = null; Map<String,String> properties = null; - FunctionSignature appliedFunction = null; CreateFeedStatement cfs = null; Pair<Identifier,Identifier> sourceNameComponents = null; } { - ( - <SECONDARY> <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() - <FROM> <FEED> sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())? - { - cfs = new CreateSecondaryFeedStatement(nameComponents, sourceNameComponents, appliedFunction, ifNotExists); - } - | - (<PRIMARY>)? <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() - <USING> adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())? - { - cfs = new CreatePrimaryFeedStatement(nameComponents, adapterName, properties, appliedFunction, ifNotExists); - } - ) + <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() + <USING> adapterName = AdapterName() properties = Configuration() { + cfs = new CreateFeedStatement(nameComponents, adapterName, properties, ifNotExists); return cfs; } } @@ -1157,19 +1146,29 @@ Statement FeedStatement() throws ParseException: Pair<Identifier,Identifier> datasetNameComponents = null; Map<String,String> configuration = null; + FunctionSignature appliedFunction = null; Statement stmt = null; String policy = null; } { ( - <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName() (policy = GetPolicy())? + <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName() + (appliedFunction = ApplyFunction())? (policy = GetPolicy())? { - stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, policy, getVarCounter()); + stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunction, policy, getVarCounter()); } | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> <DATASET> datasetNameComponents = QualifiedName() { stmt = new DisconnectFeedStatement(feedNameComponents, datasetNameComponents); } + | <START> <FEED> feedNameComponents = QualifiedName() + { + stmt = new StartFeedStatement (feedNameComponents); + } + | <STOP> <FEED> feedNameComponents = QualifiedName() + { + stmt = new StopFeedStatement (feedNameComponents); + } ) { return stmt; @@ -2706,6 +2705,8 @@ TOKEN : | <SECONDARY : "secondary"> | <SELECT : "select"> | <SET : "set"> + | <START: "start"> + | <STOP: "stop"> | <SOME : "some"> | <TEMPORARY : "temporary"> | <THEN : "then"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java index 06fbf33..612b230 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java @@ -79,12 +79,12 @@ public interface Statement extends ILangExpression { public static final byte INDEX_DECL = 0x13; public static final byte CREATE_DATAVERSE = 0x14; public static final byte INDEX_DROP = 0x15; - public static final byte CREATE_PRIMARY_FEED = 0x16; - public static final byte CREATE_SECONDARY_FEED = 0x17; - public static final byte DROP_FEED = 0x18; - public static final byte CONNECT_FEED = 0x19; - public static final byte DISCONNECT_FEED = 0x1a; - public static final byte SUBSCRIBE_FEED = 0x1b; + public static final byte CREATE_FEED = 0x16; + public static final byte DROP_FEED = 0x17; + public static final byte START_FEED = 0x18; + public static final byte STOP_FEED = 0x19; + public static final byte CONNECT_FEED = 0x1a; + public static final byte DISCONNECT_FEED = 0x1b; public static final byte CREATE_FEED_POLICY = 0x1c; public static final byte DROP_FEED_POLICY = 0x1d; public static final byte CREATE_FUNCTION = 0x1e; @@ -93,6 +93,7 @@ public interface Statement extends ILangExpression { public static final byte EXTERNAL_DATASET_REFRESH = 0x21; public static final byte RUN = 0x22; public static final byte EXTENSION = 0x23; + public static final byte SUBSCRIBE_FEED = 0x24; private Kind() { } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java index ceab6e9..0bd34ee 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java @@ -19,26 +19,28 @@ package org.apache.asterix.lang.common.statement; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.hyracks.algebricks.common.utils.Pair; +import java.util.ArrayList; +import java.util.List; + public class ConnectFeedStatement implements Statement { private final Identifier dataverseName; private final Identifier datasetName; private final String feedName; private final String policy; - private Query query; private int varCounter; - private boolean forceConnect = false; - - public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; + private final ArrayList<FunctionSignature> appliedFunctions; public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, Pair<Identifier, Identifier> datasetNameCmp, - String policy, int varCounter) { + FunctionSignature appliedFunction, String policy, int varCounter) { + appliedFunctions = new ArrayList<>(); if (feedNameCmp.first != null && datasetNameCmp.first != null && !feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) { throw new IllegalArgumentException("Dataverse for source feed and target dataset do not match"); @@ -49,15 +51,9 @@ public class ConnectFeedStatement implements Statement { this.feedName = feedNameCmp.second.getValue(); this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName(); this.varCounter = varCounter; - } - - public ConnectFeedStatement(Identifier dataverseName, Identifier feedName, Identifier datasetName, String policy, - int varCounter) { - this.dataverseName = dataverseName; - this.datasetName = datasetName; - this.feedName = feedName.getValue(); - this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName(); - this.varCounter = varCounter; + if (appliedFunction != null) { + this.appliedFunctions.add(appliedFunction); + } } public Identifier getDataverseName() { @@ -68,10 +64,6 @@ public class ConnectFeedStatement implements Statement { return datasetName; } - public Query getQuery() { - return query; - } - public int getVarCounter() { return varCounter; } @@ -90,18 +82,14 @@ public class ConnectFeedStatement implements Statement { return visitor.visit(this, arg); } - public boolean forceConnect() { - return forceConnect; - } - - public void setForceConnect(boolean forceConnect) { - this.forceConnect = forceConnect; - } - public String getFeedName() { return feedName; } + public List<FunctionSignature> getAppliedFunctions() { + return appliedFunctions; + } + @Override public byte getCategory() { return Category.UPDATE; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java index 56e7d33..1e7a182 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFeedStatement.java @@ -23,19 +23,28 @@ import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; import org.apache.hyracks.algebricks.common.utils.Pair; -public abstract class CreateFeedStatement implements Statement { +import java.util.Map; + +/** + * The new create feed statement only concerns the feed adaptor configuration. + * All feeds are considered as primary feeds. + */ +public class CreateFeedStatement implements Statement { private final Pair<Identifier, Identifier> qName; - private final FunctionSignature appliedFunction; private final boolean ifNotExists; + private final String adaptorName; + private final Map<String, String> adaptorConfiguration; - public CreateFeedStatement(Pair<Identifier, Identifier> qName, FunctionSignature appliedFunction, - boolean ifNotExists) { + public CreateFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName, + Map<String, String> adaptorConfiguration, boolean ifNotExists) { this.qName = qName; - this.appliedFunction = appliedFunction; this.ifNotExists = ifNotExists; + this.adaptorName = adaptorName; + this.adaptorConfiguration = adaptorConfiguration; } public Identifier getDataverseName() { @@ -46,16 +55,27 @@ public abstract class CreateFeedStatement implements Statement { return qName.second; } - public FunctionSignature getAppliedFunction() { - return appliedFunction; - } - public boolean getIfNotExists() { return this.ifNotExists; } + public String getAdaptorName() { + return adaptorName; + } + + public Map<String, String> getAdaptorConfiguration() { + return adaptorConfiguration; + } + + @Override + public byte getKind() { + return Kind.CREATE_FEED; + } + @Override - public abstract <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException; + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + return visitor.visit(this, arg); + } @Override public byte getCategory() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java deleted file mode 100644 index c584ed0..0000000 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreatePrimaryFeedStatement.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.common.statement; - -import java.util.Map; - -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.hyracks.algebricks.common.utils.Pair; - -public class CreatePrimaryFeedStatement extends CreateFeedStatement { - - private final String adaptorName; - private final Map<String, String> adaptorConfiguration; - - public CreatePrimaryFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName, - Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) { - super(qName, appliedFunction, ifNotExists); - this.adaptorName = adaptorName; - this.adaptorConfiguration = adaptorConfiguration; - } - - public String getAdaptorName() { - return adaptorName; - } - - public Map<String, String> getAdaptorConfiguration() { - return adaptorConfiguration; - } - - @Override - public byte getKind() { - return Statement.Kind.CREATE_PRIMARY_FEED; - } - - @Override - public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { - return visitor.visit(this, arg); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java deleted file mode 100644 index 241bcd8..0000000 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateSecondaryFeedStatement.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.lang.common.statement; - -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.hyracks.algebricks.common.utils.Pair; - -/** - * Represents the AQL statement for creating a secondary feed. - * A secondary feed is one that derives its data from another (primary/secondary) feed. - */ -public class CreateSecondaryFeedStatement extends CreateFeedStatement { - - /** The source feed that provides data for this secondary feed. */ - private final Pair<Identifier, Identifier> sourceQName; - - public CreateSecondaryFeedStatement(Pair<Identifier, Identifier> qName, Pair<Identifier, Identifier> sourceQName, - FunctionSignature appliedFunction, boolean ifNotExists) { - super(qName, appliedFunction, ifNotExists); - this.sourceQName = sourceQName; - } - - public String getSourceFeedDataverse() { - return sourceQName.first != null ? sourceQName.first.toString() - : getDataverseName() != null ? getDataverseName().getValue() : null; - } - - public String getSourceFeedName() { - return sourceQName.second != null ? sourceQName.second.toString() : null; - } - - @Override - public byte getKind() { - return Statement.Kind.CREATE_SECONDARY_FEED; - } - - @Override - public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { - return visitor.visit(this, arg); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java new file mode 100644 index 0000000..b3452b5 --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.lang.common.statement; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.hyracks.algebricks.common.utils.Pair; + +public class StartFeedStatement implements Statement { + + public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; + private Identifier dataverseName; + private Identifier feedName; + + public StartFeedStatement(Pair<Identifier, Identifier> feedNameComp) { + dataverseName = feedNameComp.first; + feedName = feedNameComp.second; + } + + @Override + public byte getKind() { + return Kind.START_FEED; + } + + @Override + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + return visitor.visit(this, arg); + } + + @Override + public byte getCategory() { + return Category.UPDATE; + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public Identifier getFeedName() { + return feedName; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java new file mode 100644 index 0000000..c45933e --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StopFeedStatement.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.statement; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.hyracks.algebricks.common.utils.Pair; + +public class StopFeedStatement implements Statement { + + private final Identifier dataverseName; + private final Identifier feedName; + + public StopFeedStatement(Pair<Identifier, Identifier> feedNameComp) { + this.dataverseName = feedNameComp.first; + this.feedName = feedNameComp.second; + } + + @Override + public byte getKind() { + return Kind.STOP_FEED; + } + + @Override + public byte getCategory() { + return Category.UPDATE; + } + + @Override + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + return visitor.visit(this, arg); + } + + public Identifier getDataverseName() { + return dataverseName; + } + + public Identifier getFeedName() { + return feedName; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java index eefed9d..35d0a29 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java @@ -60,14 +60,14 @@ import org.apache.asterix.lang.common.expression.TypeReferenceExpression; import org.apache.asterix.lang.common.expression.UnaryExpr; import org.apache.asterix.lang.common.expression.UnorderedListTypeDefinition; import org.apache.asterix.lang.common.expression.VariableExpr; +import org.apache.asterix.lang.common.literal.IntegerLiteral; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; +import org.apache.asterix.lang.common.statement.CreateFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -87,6 +87,8 @@ import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; import org.apache.asterix.lang.common.statement.NodegroupDecl; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.statement.SetStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.TypeDecl; import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.UpdateStatement; @@ -107,14 +109,13 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> { private final static String CREATE = "create "; private final static String FEED = " feed "; private final static String DEFAULT_DATAVERSE_FORMAT = "org.apache.asterix.runtime.formats.NonTaggedDataFormat"; + private final PrintWriter out; protected Set<Character> validIdentifierChars = new HashSet<Character>(); protected Set<Character> validIdentifierStartChars = new HashSet<Character>(); protected String dataverseSymbol = " dataverse "; protected String datasetSymbol = " dataset "; protected String assignSymbol = ":="; - private final PrintWriter out; - public FormatPrintVisitor() { this(new PrintWriter(System.out)); } @@ -747,35 +748,36 @@ public class FormatPrintVisitor implements ILangVisitor<Void, Integer> { if (connectFeedStmt.getPolicy() != null) { out.print(" using policy " + revertStringToQuoted(connectFeedStmt.getPolicy())); } + if (connectFeedStmt.getAppliedFunctions() != null) { + out.print(" apply function " + connectFeedStmt.getAppliedFunctions()); + } out.println(SEMICOLON); return null; } @Override - public Void visit(CreatePrimaryFeedStatement cpfs, Integer step) throws CompilationException { - out.print(skip(step) + CREATE + " primary feed "); - out.print(generateFullName(cpfs.getDataverseName(), cpfs.getFeedName())); - out.print(generateIfNotExists(cpfs.getIfNotExists())); - out.print(" using " + cpfs.getAdaptorName() + " "); - printConfiguration(cpfs.getAdaptorConfiguration()); - FunctionSignature func = cpfs.getAppliedFunction(); - if (func != null) { - out.print(" apply function " + generateFullName(func.getNamespace(), func.getName())); - } + public Void visit(CreateFeedStatement cfs, Integer step) throws CompilationException { + out.print(skip(step) + "create " + FEED); + out.print(generateFullName(cfs.getDataverseName(), cfs.getFeedName())); + out.print(generateIfNotExists(cfs.getIfNotExists())); + out.print(" using " + cfs.getAdaptorName() + " "); + printConfiguration(cfs.getAdaptorConfiguration()); out.println(SEMICOLON); return null; } @Override - public Void visit(CreateSecondaryFeedStatement csfs, Integer step) throws CompilationException { - out.print(skip(step) + CREATE + " secondary feed "); - out.print(generateFullName(csfs.getDataverseName(), csfs.getFeedName())); - out.print(generateIfNotExists(csfs.getIfNotExists())); - out.print(" from feed " + generateFullName(csfs.getSourceFeedDataverse(), csfs.getSourceFeedName())); - FunctionSignature func = csfs.getAppliedFunction(); - if (func != null) { - out.print(" apply function " + generateFullName(func.getNamespace(), func.getName())); - } + public Void visit(StartFeedStatement startFeedStatement, Integer step) throws CompilationException { + out.print(skip(step) + "start " + FEED); + out.print(generateFullName(startFeedStatement.getDataverseName(), startFeedStatement.getFeedName())); + out.println(SEMICOLON); + return null; + } + + @Override + public Void visit(StopFeedStatement stopFeedStatement, Integer step) throws CompilationException { + out.print(skip(step) + "stop " + FEED); + out.print(generateFullName(stopFeedStatement.getDataverseName(), stopFeedStatement.getFeedName())); out.println(SEMICOLON); return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java index a4868d0..117fa77 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java @@ -28,10 +28,9 @@ import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; +import org.apache.asterix.lang.common.statement.CreateFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -47,6 +46,8 @@ import org.apache.asterix.lang.common.statement.LoadStatement; import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; import org.apache.asterix.lang.common.statement.NodegroupDecl; import org.apache.asterix.lang.common.statement.SetStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.TypeDecl; import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.UpdateStatement; @@ -180,12 +181,17 @@ public abstract class AbstractQueryExpressionVisitor<R, T> implements ILangVisit } @Override - public R visit(CreatePrimaryFeedStatement del, T arg) throws CompilationException { + public R visit(CreateFeedStatement cfs, T arg) throws CompilationException { return null; } @Override - public R visit(CreateSecondaryFeedStatement del, T arg) throws CompilationException { + public R visit(StartFeedStatement sfs, T arg) throws CompilationException { + return null; + } + + @Override + public R visit(StopFeedStatement sfs, T arg) throws CompilationException { return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java index 4a5c5ed..cace925 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java @@ -44,10 +44,9 @@ import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; +import org.apache.asterix.lang.common.statement.CreateFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -65,6 +64,8 @@ import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; import org.apache.asterix.lang.common.statement.NodegroupDecl; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.statement.SetStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.TypeDecl; import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.UpdateStatement; @@ -152,9 +153,11 @@ public interface ILangVisitor<R, T> { R visit(ConnectFeedStatement del, T arg) throws CompilationException; - R visit(CreatePrimaryFeedStatement cpfs, T arg) throws CompilationException; + R visit(StartFeedStatement sfs, T arg) throws CompilationException; - R visit(CreateSecondaryFeedStatement csfs, T arg) throws CompilationException; + R visit(StopFeedStatement sfs, T arg) throws CompilationException; + + R visit(CreateFeedStatement cfs, T arg) throws CompilationException; R visit(FeedDropStatement del, T arg) throws CompilationException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index 81f00ee..d791c85 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -89,13 +89,13 @@ import org.apache.asterix.lang.common.literal.TrueLiteral; import org.apache.asterix.lang.common.parser.ScopeChecker; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; +import org.apache.asterix.lang.common.statement.StartFeedStatement; +import org.apache.asterix.lang.common.statement.StopFeedStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; import org.apache.asterix.lang.common.statement.CreateFeedStatement; import org.apache.asterix.lang.common.statement.CreateFunctionStatement; import org.apache.asterix.lang.common.statement.CreateIndexStatement; -import org.apache.asterix.lang.common.statement.CreatePrimaryFeedStatement; -import org.apache.asterix.lang.common.statement.CreateSecondaryFeedStatement; import org.apache.asterix.lang.common.statement.DatasetDecl; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.DataverseDropStatement; @@ -741,30 +741,16 @@ CreateFeedStatement FeedSpecification() throws ParseException: boolean ifNotExists = false; String adapterName = null; Map<String,String> properties = null; - FunctionSignature appliedFunction = null; CreateFeedStatement cfs = null; Pair<Identifier,Identifier> sourceNameComponents = null; - } { - ( - <SECONDARY> <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() - <FROM> <FEED> sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())? - { - cfs = new CreateSecondaryFeedStatement(nameComponents, - sourceNameComponents, appliedFunction, ifNotExists); - } - | - (<PRIMARY>)? <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() - <USING> adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())? - { - cfs = new CreatePrimaryFeedStatement(nameComponents, - adapterName, properties, appliedFunction, ifNotExists); - } - ) - { - return cfs; - } + <FEED> nameComponents = QualifiedName() ifNotExists = IfNotExists() + <USING> adapterName = AdapterName() properties = Configuration() + { + cfs = new CreateFeedStatement(nameComponents, adapterName, properties, ifNotExists); + return cfs; + } } CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException: @@ -1185,12 +1171,43 @@ Statement ConnectionStatement() throws ParseException: ( <CONNECT> stmt = ConnectStatement() | <DISCONNECT> stmt = DisconnectStatement() + | <START> stmt = StartStatement() + | <STOP> stmt = StopStatement() ) { return stmt; } } +Statement StartStatement() throws ParseException: +{ + Pair<Identifier,Identifier> feedNameComponents = null; + + Statement stmt = null; +} +{ + <FEED> feedNameComponents = QualifiedName() + { + stmt = new StartFeedStatement (feedNameComponents); + return stmt; + } +} + +Statement StopStatement () throws ParseException: +{ + Pair<Identifier,Identifier> feedNameComponents = null; + + Statement stmt = null; +} +{ + <FEED> feedNameComponents = QualifiedName() + { + stmt = new StopFeedStatement (feedNameComponents); + return stmt; + } +} + + Statement DisconnectStatement() throws ParseException: { Pair<Identifier,Identifier> feedNameComponents = null; @@ -1218,14 +1235,17 @@ Statement ConnectStatement() throws ParseException: Pair<Identifier,Identifier> datasetNameComponents = null; Map<String,String> configuration = null; + FunctionSignature appliedFunction = null; Statement stmt = null; String policy = null; } { ( - <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName() (policy = GetPolicy())? + <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName() + (appliedFunction = ApplyFunction())? (policy = GetPolicy())? { - stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, policy, getVarCounter()); + stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunction, + policy, getVarCounter()); } ) { @@ -3200,6 +3220,8 @@ TOKEN [IGNORE_CASE]: | <SELECT : "select"> | <SET : "set"> | <SOME : "some"> + | <START : "start"> + | <STOP : "stop"> | <TEMPORARY : "temporary"> | <THEN : "then"> | <TYPE : "type">
