http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f4ef1a1..8a25888 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -65,19 +65,19 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.api.IFeed.FeedType; import org.apache.asterix.external.feed.api.IFeedJoint; import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent; +import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.management.FeedEventsListener; import org.apache.asterix.external.feed.management.FeedJointKey; -import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails; +import org.apache.asterix.external.feed.watch.FeedActivityDetails; import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; import org.apache.asterix.external.feed.watch.FeedIntakeInfo; import org.apache.asterix.external.indexing.ExternalFile; @@ -342,7 +342,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case Statement.Kind.INSERT: case Statement.Kind.UPSERT: - handleInsertUpsertStatement(metadataProvider, stmt, hcc); + if (((InsertStatement) stmt).getReturnQuery() != null) { + metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); + metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC + || resultDelivery == ResultDelivery.ASYNC_DEFERRED); + } + handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false); break; case Statement.Kind.DELETE: handleDeleteStatement(metadataProvider, stmt, hcc); @@ -393,7 +398,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // No op break; case Statement.Kind.EXTENSION: - ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc); + ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc, hdc, resultDelivery, stats, + resultSetIdCounter); break; default: throw new AsterixException("Unknown function"); @@ -506,7 +512,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt, + public void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws AsterixException, Exception { MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); DatasetDecl dd = (DatasetDecl) stmt; @@ -602,8 +608,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case EXTERNAL: String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); - Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()) - .getProperties(); + Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), ExternalDatasetTransactionState.COMMIT); @@ -705,7 +710,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen StringBuilder builder = null; IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(dataverseName, datasetName)) { if (builder == null) { builder = new StringBuilder(); } @@ -730,8 +735,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, - MetadataTransactionContext mdTxnCtx) + protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx) throws AsterixException { int nodegroupCardinality; String nodegroupName; @@ -984,8 +988,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ARecordType enforcedType = null; if (stmtCreateIndex.isEnforced()) { - enforcedType = createEnforcedType(aRecordType, - Lists.newArrayList(index)); + enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index)); } // #. prepare to create the index artifact in NC. @@ -1350,7 +1353,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); } - protected void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, + public void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt; String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); @@ -1369,8 +1372,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); return; } else { - throw new AlgebricksException("There is no dataset with this name " + datasetName - + " in dataverse " + dataverseName + "."); + throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " + + dataverseName + "."); } } @@ -1424,7 +1427,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(dataverseName, datasetName)) { throw new AsterixException( "Can't drop dataset since it is connected to active entity: " + listener.getEntityId()); } @@ -1482,8 +1485,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } else { CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexes.get(j).getIndexName()); - jobsToExecute.add( - ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds)); } } @@ -1547,7 +1549,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { + if (listener.isEntityUsingDataset(dataverseName, datasetName)) { if (builder == null) { builder = new StringBuilder(); } @@ -1555,9 +1557,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } if (builder != null) { - throw new AsterixException( - "Dataset" + datasetName + " is currently being fed into by the following active entities: " - + builder.toString()); + throw new AsterixException("Dataset" + datasetName + + " is currently being fed into by the following active entities: " + builder.toString()); } if (ds.getDatasetType() == DatasetType.INTERNAL) { @@ -1578,11 +1579,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), - index.getKeyFieldNames(), + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.isPrimaryIndex(), - IMetadataEntity.PENDING_DROP_OP)); + index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1644,11 +1643,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), - index.getKeyFieldNames(), + new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.isPrimaryIndex(), - IMetadataEntity.PENDING_DROP_OP)); + index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1707,9 +1704,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } catch (Exception e2) { e.addSuppressed(e2); abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending index(" - + dataverseName + "." + datasetName + "." - + indexName + ") couldn't be removed from the metadata", e); + throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "." + + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); } } @@ -1748,8 +1744,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) - throws Exception { + protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception { NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; String nodegroupName = stmtDelete.getNodeGroupName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -1859,40 +1854,61 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { + public JobSpecification handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, + IStatementExecutor.Stats stats, boolean compileOnly) throws Exception { InsertStatement stmtInsertUpsert = (InsertStatement) stmt; + String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName()); Query query = stmtInsertUpsert.getQuery(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); + if (stmtInsertUpsert.getReturnQuery() != null) { + if (!stmtInsertUpsert.getReturnQuery().getDatasets().isEmpty()) { + throw new AsterixException("Cannot use datasets in an insert returning query"); + } + // returnQuery Rewriting (happens under the same ongoing metadata transaction) + Pair<Query, Integer> rewrittenReturnQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, + stmtInsertUpsert.getReturnQuery(), sessionConfig); + + stmtInsertUpsert.getQuery().setVarCounter(rewrittenReturnQuery.first.getVarCounter()); + stmtInsertUpsert.setRewrittenReturnQuery(rewrittenReturnQuery.first); + stmtInsertUpsert.addToVarCounter(rewrittenReturnQuery.second); + } + MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName, dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets()); - + JobSpecification compiled = null; try { metadataProvider.setWriteTransaction(true); CompiledInsertStatement clfrqs = null; switch (stmtInsertUpsert.getKind()) { case Statement.Kind.INSERT: clfrqs = new CompiledInsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(), - query, stmtInsertUpsert.getVarCounter()); + query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(), + stmtInsertUpsert.getReturnQuery()); break; case Statement.Kind.UPSERT: clfrqs = new CompiledUpsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(), - query, stmtInsertUpsert.getVarCounter()); + query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(), + stmtInsertUpsert.getReturnQuery()); break; default: throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind()); } - JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs); + compiled = rewriteCompileQuery(metadataProvider, query, clfrqs); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - if (compiled != null) { - JobUtils.runJob(hcc, compiled, true); + if (compiled != null && !compileOnly) { + if (stmtInsertUpsert.getReturnQuery() != null) { + handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats); + } else { + JobUtils.runJob(hcc, compiled, true); + } } } catch (Exception e) { @@ -1905,9 +1921,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets()); } + return compiled; } - protected void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt, + public void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { DeleteStatement stmtDelete = (DeleteStatement) stmt; @@ -1948,7 +1965,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen @Override public JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt) - throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException { + throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException { // Query Rewriting (happens under the same ongoing metadata transaction) Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, @@ -2034,12 +2051,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String description = cfps.getDescription() == null ? "" : cfps.getDescription(); if (extendingExisting) { FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE - .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, - cfps.getSourcePolicyName()); + .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { - sourceFeedPolicy = - MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), - MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); + sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), + MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName()); } @@ -2158,7 +2173,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); boolean subscriberRegistered = false; - IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber(); + IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); FeedConnectionId feedConnId = null; EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName()); FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE @@ -2211,7 +2226,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen triple.third.get(0), pair.first); pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob); JobUtils.runJob(hcc, pair.first, false); - eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED); + eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED); } else { for (IFeedJoint fj : triple.third) { listener.registerFeedJoint(fj, 0); @@ -2219,9 +2234,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED); + eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED); if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) { - eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call + eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call } } catch (Exception e) { if (bActiveTxn) { @@ -2251,7 +2266,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen */ protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse, Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx) - throws AsterixException { + throws AsterixException { IFeedJoint sourceFeedJoint; FeedConnectionRequest request; List<String> functionsToApply = new ArrayList<>(); @@ -2349,10 +2364,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName()); FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue()); - IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber(); + IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE .getActiveEntityListener(entityId); - if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) { + if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) { throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); } @@ -2372,7 +2387,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; JobUtils.runJob(hcc, jobSpec, true); - eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); + eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); } catch (Exception e) { if (bActiveTxn) { abort(e, e, mdTxnCtx); @@ -2428,8 +2443,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (compiled != null) { FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId()); - FeedConnectJobInfo activeJob = new FeedConnectJobInfo( - bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE, + FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(), + null, ActivityState.ACTIVE, new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset), listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties()); alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob); @@ -2484,8 +2499,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen dataverseName); jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); ARecordType aRecordType = (ARecordType) dt.getDatatype(); - ARecordType enforcedType = createEnforcedType( - aRecordType, indexes); + ARecordType enforcedType = createEnforcedType(aRecordType, indexes); if (ds.getDatasetType() == DatasetType.INTERNAL) { for (int j = 0; j < indexes.size(); j++) { if (indexes.get(j).isSecondaryIndex()) { @@ -2541,59 +2555,25 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider)); } - protected void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, - IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception { - + protected JobSpecification handleQuery(AqlMetadataProvider metadataProvider, Query query, + IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) + throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets()); + JobSpecification compiled; try { - JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null); + compiled = rewriteCompileQuery(metadataProvider, query, null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (query.isExplain()) { sessionConfig.out().flush(); - return; + return compiled; } else if (sessionConfig.isExecuteQuery() && compiled != null) { - if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) { - GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1)); - } - JobId jobId = JobUtils.runJob(hcc, compiled, false); - - JSONObject response = new JSONObject(); - switch (resultDelivery) { - case ASYNC: - JSONArray handle = new JSONArray(); - handle.put(jobId.getId()); - handle.put(metadataProvider.getResultSetId().getId()); - response.put("handle", handle); - sessionConfig.out().print(response); - sessionConfig.out().flush(); - hcc.waitForCompletion(jobId); - break; - case SYNC: - hcc.waitForCompletion(jobId); - ResultReader resultReader = new ResultReader(hdc); - resultReader.open(jobId, metadataProvider.getResultSetId()); - ResultUtil.displayResults(resultReader, sessionConfig, stats, - metadataProvider.findOutputRecordType()); - break; - case ASYNC_DEFERRED: - handle = new JSONArray(); - handle.put(jobId.getId()); - handle.put(metadataProvider.getResultSetId().getId()); - response.put("handle", handle); - hcc.waitForCompletion(jobId); - sessionConfig.out().print(response); - sessionConfig.out().flush(); - break; - default: - break; - - } + handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats); } } catch (Exception e) { LOGGER.log(Level.INFO, e.getMessage(), e); @@ -2606,6 +2586,47 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // release external datasets' locks acquired during compilation of the query ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } + return compiled; + } + + private void handleQueryResult(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc, + IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats) + throws Exception { + if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) { + GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1)); + } + JobId jobId = JobUtils.runJob(hcc, compiled, false); + + JSONObject response = new JSONObject(); + switch (resultDelivery) { + case ASYNC: + JSONArray handle = new JSONArray(); + handle.put(jobId.getId()); + handle.put(metadataProvider.getResultSetId().getId()); + response.put("handle", handle); + sessionConfig.out().print(response); + sessionConfig.out().flush(); + hcc.waitForCompletion(jobId); + break; + case SYNC: + hcc.waitForCompletion(jobId); + ResultReader resultReader = new ResultReader(hdc); + resultReader.open(jobId, metadataProvider.getResultSetId()); + ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType()); + break; + case ASYNC_DEFERRED: + handle = new JSONArray(); + handle.put(jobId.getId()); + handle.put(metadataProvider.getResultSetId().getId()); + response.put("handle", handle); + hcc.waitForCompletion(jobId); + sessionConfig.out().print(response); + sessionConfig.out().flush(); + break; + default: + break; + + } } protected void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt) @@ -2876,8 +2897,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; default: throw new AlgebricksException( - "The system \"" + runStmt.getSystem() + - "\" specified in your run statement is not supported."); + "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported."); } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index a548b3a..a806532 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -170,7 +170,7 @@ public class TestNodeController { indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true); CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(), - primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION); + primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true); insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); return insertOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java index 6e3f94c..2716859 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java @@ -30,6 +30,7 @@ import java.util.logging.Logger; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.api.java.AsterixJavaClient; +import org.apache.asterix.app.cc.CompilerExtensionManager; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.AsterixException; @@ -38,6 +39,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.compiler.provider.SqlppCompilationProvider; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.IdentitiyResolverFactory; +import org.apache.asterix.runtime.util.AsterixAppContextInfo; import org.apache.asterix.test.base.AsterixTestHelper; import org.apache.asterix.test.common.TestHelper; import org.apache.asterix.test.runtime.HDFSCluster; @@ -66,15 +68,16 @@ public class OptimizerTest { + "optimizerts" + SEPARATOR; private static final String PATH_QUERIES = PATH_BASE + "queries" + SEPARATOR; private static final String PATH_EXPECTED = PATH_BASE + "results" + SEPARATOR; - private static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR; + protected static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR; private static final ArrayList<String> ignore = AsterixTestHelper.readFile(FILENAME_IGNORE, PATH_BASE); private static final ArrayList<String> only = AsterixTestHelper.readFile(FILENAME_ONLY, PATH_BASE); - private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + protected static String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider(); private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider(); + protected static ILangCompilationProvider extensionLangCompilationProvider = null; - private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + protected static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); @BeforeClass public static void setUp() throws Exception { @@ -171,9 +174,13 @@ public class OptimizerTest { PrintWriter plan = new PrintWriter(actualFile); ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider : sqlppCompilationProvider; + if (extensionLangCompilationProvider != null) { + provider = extensionLangCompilationProvider; + } IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection(); AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider, - new DefaultStatementExecutorFactory(null)); + new DefaultStatementExecutorFactory( + (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager())); try { asterix.compile(true, false, false, true, true, false, false); } catch (AsterixException e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java index 39a4d3b..3929113 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java @@ -52,8 +52,12 @@ public class ExecutionTestUtil { return setUp(cleanup, TEST_CONFIG_FILE_NAME, integrationUtil, true); } + public static List<ILibraryManager> setUp(boolean cleanup, String configFile) throws Exception { + return setUp(cleanup, configFile, integrationUtil, true); + } + public static List<ILibraryManager> setUp(boolean cleanup, String configFile, - AsterixHyracksIntegrationUtil integrationUtil, boolean startHdfs) throws Exception { + AsterixHyracksIntegrationUtil alternateIntegrationUtil, boolean startHdfs) throws Exception { System.out.println("Starting setup"); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting setup"); @@ -63,6 +67,7 @@ public class ExecutionTestUtil { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("initializing pseudo cluster"); } + integrationUtil = alternateIntegrationUtil; integrationUtil.init(cleanup); if (LOGGER.isLoggable(Level.INFO)) { @@ -87,8 +92,8 @@ public class ExecutionTestUtil { libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager()); // Adds library managers for NCs, one-per-NC. for (NodeControllerService nc : integrationUtil.ncs) { - IAsterixAppRuntimeContext runtimeCtx = - (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject(); + IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext() + .getApplicationObject(); libraryManagers.add(runtimeCtx.getLibraryManager()); } return libraryManagers; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql new file mode 100644 index 0000000..205edc2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +drop dataverse test if exists; +create dataverse test; +use dataverse test; + +create type TweetMessageTypeuuid as closed { + tweetid: int, + message-text: string, + location:point +} + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid; + + +insert into dataset TweetMessageuuids as $a( +let $x := +[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)}, +{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)}, +{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)}, +{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)}, +{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}] +for $y in $x +return $y +) returning +let $x := create-circle($a.location,5.0) +order by $a.tweetid +return { + "x":$x, + "tweetid":$a.tweetid +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan new file mode 100644 index 0000000..391b248 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan @@ -0,0 +1,17 @@ +-- DISTRIBUTE_RESULT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$16(ASC)] |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- COMMIT |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- INSERT_DELETE |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$16] |PARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- STREAM_PROJECT |UNPARTITIONED| + -- UNNEST |UNPARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql new file mode 100644 index 0000000..a75f9da --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-return-records + * Description : Check records returned on insert + * Expected Result : Success + * Date : Mar 2015 + */ + +drop dataverse test if exists; +create dataverse test; +use dataverse test; + +create type TweetMessageTypeuuid as closed { + tweetid: int, + message-text: string +} + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql new file mode 100644 index 0000000..6fc7112 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-return-records + * Description : Check records returned on insert + * Expected Result : Success + * Date : Mar 2015 + */ + + use dataverse test; + +insert into dataset TweetMessageuuids as $message ( +[{ "tweetid":1,"message-text":"hello"}, +{"tweetid":2,"message-text":"goodbye"}, +{"tweetid":3,"message-text":"the end"}, +{"tweetid":4,"message-text":"what"}, +{"tweetid":5,"message-text":"good"}] +) returning $message; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql new file mode 100644 index 0000000..9e77afb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-returning-fieldname + * Description : Check fields returned on insert + * Expected Result : Success + * Date : Mar 2015 + */ + + +drop dataverse test if exists; +create dataverse test; +use dataverse test; + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + message-text: string +} + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql new file mode 100644 index 0000000..6a7be26 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-returning-fieldname + * Description : Check fields returned on insert + * Expected Result : Success + * Date : Mar 2015 + */ + +use dataverse test; + +insert into dataset TweetMessageuuids as $message ( +{ "message-text":"hello"} +) returning $message.message-text; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql new file mode 100644 index 0000000..a75f9da --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-return-records + * Description : Check records returned on insert + * Expected Result : Success + * Date : Mar 2015 + */ + +drop dataverse test if exists; +create dataverse test; +use dataverse test; + +create type TweetMessageTypeuuid as closed { + tweetid: int, + message-text: string +} + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql new file mode 100644 index 0000000..417860d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : insert-with-bad-return + * Description : Throw an error + * Expected Result : Error + * Date : Oct 2016 + */ + + use dataverse test; + +insert into dataset TweetMessageuuids as $message ( +[{ "tweetid":1,"message-text":"hello"}, +{"tweetid":2,"message-text":"goodbye"}, +{"tweetid":3,"message-text":"the end"}, +{"tweetid":4,"message-text":"what"}, +{"tweetid":5,"message-text":"good"}] +) returning +for $result in dataset TweetMessageuuids +where $result.message-text = $message +return $result; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql new file mode 100644 index 0000000..50ff8ae --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : upsert-return-custom-result + * Description : Check records returned on upsert + * Expected Result : Success + * Date : Mar 2015 + */ + +drop dataverse test if exists; +create dataverse test; +use dataverse test; + +create type TweetMessageTypeuuid as closed { + tweetid: int, + message-text: string, + location:point +} + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql new file mode 100644 index 0000000..9abf667 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Test case Name : upsert-return-custom-result + * Description : Check records returned on upsert + * Expected Result : Success + * Date : Mar 2015 + */ + + use dataverse test; + +upsert into dataset TweetMessageuuids as $a ( +let $x := +[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)}, +{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)}, +{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)}, +{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)}, +{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}] +for $y in $x +return $y +) returning +let $x := create-circle($a.location,5.0) +order by $a.tweetid +return { + "x":$x, + "tweetid":$a.tweetid +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm new file mode 100644 index 0000000..2ee9b1c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm @@ -0,0 +1,5 @@ +{ "tweetid": 1, "message-text": "hello" } +{ "tweetid": 2, "message-text": "goodbye" } +{ "tweetid": 4, "message-text": "what" } +{ "tweetid": 3, "message-text": "the end" } +{ "tweetid": 5, "message-text": "good" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm new file mode 100644 index 0000000..84ed78b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm @@ -0,0 +1 @@ +"hello" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm new file mode 100644 index 0000000..8706beb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm @@ -0,0 +1,5 @@ +{ "x": circle("6.0,6.0 5.0"), "tweetid": 1 } +{ "x": circle("1.0,1.0 5.0"), "tweetid": 2 } +{ "x": circle("3.0,6.0 5.0"), "tweetid": 4 } +{ "x": circle("6.0,3.0 5.0"), "tweetid": 3 } +{ "x": circle("5.0,6.0 5.0"), "tweetid": 5 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml index 85884f8..d0e342c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -1699,6 +1699,27 @@ </compilation-unit> </test-case> <test-case FilePath="dml"> + <compilation-unit name="insert-return-records"> + <output-dir compare="Text">insert-return-records</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="dml"> + <compilation-unit name="insert-returning-fieldname"> + <output-dir compare="Text">insert-returning-fieldname</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="dml"> + <compilation-unit name="insert-with-bad-return"> + <output-dir compare="Text">insert-with-bad-return</output-dir> + <expected-error>Error: Cannot use datasets in an insert returning query</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="dml"> + <compilation-unit name="upsert-return-custom-result"> + <output-dir compare="Text">upsert-return-custom-result</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="dml"> <compilation-unit name="insert-src-dst-01"> <output-dir compare="Text">insert-src-dst-01</output-dir> </compilation-unit> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-doc/src/site/markdown/aql/manual.md ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md index 510bc83..393beec 100644 --- a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md +++ b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md @@ -824,7 +824,7 @@ data that has been prepared in ADM format. #### Insert - InsertStatement ::= "insert" "into" "dataset" QualifiedName Query + InsertStatement ::= "insert" "into" "dataset" QualifiedName ( "as" Variable )? Query ( "returning" Query )? The AQL insert statement is used to insert data into a dataset. The data to be inserted comes from an AQL query expression. @@ -834,13 +834,17 @@ being the insertion of a single object plus its affiliated secondary index entri If the query part of an insert returns a single object, then the insert statement itself will be a single, atomic transaction. If the query part returns multiple objects, then each object inserted will be handled independently -as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.) +as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.). +The optional "as Variable" provides a variable binding for the inserted records, which can be used in the "returning" clause. +The optional "returning Query" allows users to run simple queries/functions on the records returned by the insert. +This query cannot refer to any datasets. + The following example illustrates a query-based insertion. ##### Example - insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user) + insert into dataset UsersCopy as $inserted (for $user in dataset FacebookUsers return $user ) returning $inserted.screen-name #### Delete http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java new file mode 100644 index 0000000..be9a245 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.api; + +import org.apache.asterix.common.exceptions.AsterixException; + +public interface IActiveLifecycleEventSubscriber { + + public enum ActiveLifecycleEvent { + FEED_INTAKE_STARTED, + FEED_COLLECT_STARTED, + FEED_INTAKE_FAILURE, + FEED_COLLECT_FAILURE, + FEED_INTAKE_ENDED, + FEED_COLLECT_ENDED, + ACTIVE_JOB_STARTED, + ACTIVE_JOB_ENDED, + ACTIVE_JOB_FAILED + } + + public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException; + + public void handleEvent(ActiveLifecycleEvent event); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java deleted file mode 100644 index ad3c1c9..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.api; - -import org.apache.asterix.common.exceptions.AsterixException; - -public interface IFeedLifecycleEventSubscriber { - - public enum FeedLifecycleEvent { - FEED_INTAKE_STARTED, - FEED_COLLECT_STARTED, - FEED_INTAKE_FAILURE, - FEED_COLLECT_FAILURE, - FEED_INTAKE_ENDED, - FEED_COLLECT_ENDED - } - - public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException; - - public void handleFeedEvent(FeedLifecycleEvent event); -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java new file mode 100644 index 0000000..7f8efc5 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.management; + +import java.util.Iterator; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; + +public class ActiveLifecycleEventSubscriber implements IActiveLifecycleEventSubscriber { + + private LinkedBlockingQueue<ActiveLifecycleEvent> inbox; + + public ActiveLifecycleEventSubscriber() { + this.inbox = new LinkedBlockingQueue<>(); + } + + @Override + public void handleEvent(ActiveLifecycleEvent event) { + inbox.add(event); + } + + @Override + public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException { + boolean eventOccurred = false; + ActiveLifecycleEvent e; + Iterator<ActiveLifecycleEvent> eventsSoFar = inbox.iterator(); + while (eventsSoFar.hasNext()) { + e = eventsSoFar.next(); + assertNoFailure(e); + eventOccurred = e.equals(event); + } + + while (!eventOccurred) { + e = inbox.take(); + eventOccurred = e.equals(event); + if (!eventOccurred) { + assertNoFailure(e); + } + } + } + + private void assertNoFailure(ActiveLifecycleEvent e) throws AsterixException { + if (e.equals(ActiveLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(ActiveLifecycleEvent.FEED_COLLECT_FAILURE) + || e.equals(ActiveLifecycleEvent.ACTIVE_JOB_FAILED)) { + throw new AsterixException("Failure in active job."); + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index 0c49c92..02d2c8b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -34,8 +34,8 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.external.feed.api.FeedOperationCounter; import org.apache.asterix.external.feed.api.IFeedJoint; import org.apache.asterix.external.feed.api.IFeedJoint.State; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; +import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; import org.apache.asterix.external.feed.watch.FeedIntakeInfo; @@ -63,7 +63,7 @@ import org.apache.log4j.Logger; public class FeedEventsListener implements IActiveEntityEventsListener { private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class); private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline; - private final List<IFeedLifecycleEventSubscriber> subscribers; + private final List<IActiveLifecycleEventSubscriber> subscribers; private final Map<Long, ActiveJob> jobs; private final Map<Long, ActiveJob> intakeJobs; private final Map<EntityId, FeedIntakeInfo> entity2Intake; @@ -145,7 +145,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener { case FEED_CONNECT: ((FeedConnectJobInfo) jobInfo).partitionStart(); if (((FeedConnectJobInfo) jobInfo).collectionStarted()) { - notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED); + notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED); } break; case INTAKE: @@ -161,7 +161,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener { if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) { ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE); jobInfo.setState(ActivityState.ACTIVE); - notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED); + notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED); } } @@ -339,10 +339,10 @@ public class FeedEventsListener implements IActiveEntityEventsListener { return locations; } - private synchronized void notifyFeedEventSubscribers(FeedLifecycleEvent event) { + private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) { if (subscribers != null && !subscribers.isEmpty()) { - for (IFeedLifecycleEventSubscriber subscriber : subscribers) { - subscriber.handleFeedEvent(event); + for (IActiveLifecycleEventSubscriber subscriber : subscribers) { + subscriber.handleEvent(event); } } } @@ -362,8 +362,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener { // notify event listeners feedPipeline.remove(feedId); entity2Intake.remove(feedId); - notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE - : FeedLifecycleEvent.FEED_INTAKE_ENDED); + notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE + : ActiveLifecycleEvent.FEED_INTAKE_ENDED); } private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception { @@ -389,8 +389,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener { connectJobInfos.remove(connectionId); jobs.remove(cInfo.getJobId().getId()); // notify event listeners - FeedLifecycleEvent event = - failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED; + ActiveLifecycleEvent event = + failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED; notifyFeedEventSubscribers(event); } @@ -569,16 +569,16 @@ public class FeedEventsListener implements IActiveEntityEventsListener { } - public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) { + public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) { subscribers.add(subscriber); } - public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) { + public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) { subscribers.remove(subscriber); } public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId, - IFeedLifecycleEventSubscriber eventSubscriber) { + IActiveLifecycleEventSubscriber eventSubscriber) { boolean active = false; FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId); if (cInfo != null) { @@ -643,7 +643,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener { } @Override - public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) { + public boolean isEntityUsingDataset(String dataverseName, String datasetName) { return isConnectedToDataset(datasetName); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java deleted file mode 100644 index 6e3cebce..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.management; - -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; - -public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber { - - private LinkedBlockingQueue<FeedLifecycleEvent> inbox; - - public FeedLifecycleEventSubscriber() { - this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>(); - } - - @Override - public void handleFeedEvent(FeedLifecycleEvent event) { - inbox.add(event); - } - - @Override - public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException { - boolean eventOccurred = false; - FeedLifecycleEvent e = null; - Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator(); - while (eventsSoFar.hasNext()) { - e = eventsSoFar.next(); - assertNoFailure(e); - eventOccurred = e.equals(event); - } - - while (!eventOccurred) { - e = inbox.take(); - eventOccurred = e.equals(event); - if (!eventOccurred) { - assertNoFailure(e); - } - } - } - - private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException { - if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) { - throw new AsterixException("Failure in feed"); - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java deleted file mode 100644 index aac6676..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.external.feed.watch; - -import java.util.Map; - -public class FeedActivity implements Comparable<FeedActivity> { - - private int activityId; - - private final String dataverseName; - private final String datasetName; - private final String feedName; - private final Map<String, String> feedActivityDetails; - - public static class FeedActivityDetails { - public static final String INTAKE_LOCATIONS = "intake-locations"; - public static final String COMPUTE_LOCATIONS = "compute-locations"; - public static final String STORAGE_LOCATIONS = "storage-locations"; - public static final String COLLECT_LOCATIONS = "collect-locations"; - public static final String FEED_POLICY_NAME = "feed-policy-name"; - public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp"; - } - - public FeedActivity(String dataverseName, String feedName, String datasetName, - Map<String, String> feedActivityDetails) { - this.dataverseName = dataverseName; - this.feedName = feedName; - this.datasetName = datasetName; - this.feedActivityDetails = feedActivityDetails; - } - - public String getDataverseName() { - return dataverseName; - } - - public String getDatasetName() { - return datasetName; - } - - public String getFeedName() { - return feedName; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof FeedActivity)) { - return false; - } - - if (!((FeedActivity) other).dataverseName.equals(dataverseName)) { - return false; - } - if (!((FeedActivity) other).datasetName.equals(datasetName)) { - return false; - } - if (!((FeedActivity) other).getFeedName().equals(feedName)) { - return false; - } - if (((FeedActivity) other).getActivityId() != (activityId)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public String toString() { - return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId; - } - - public String getConnectTimestamp() { - return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP); - } - - public int getActivityId() { - return activityId; - } - - public void setActivityId(int activityId) { - this.activityId = activityId; - } - - public Map<String, String> getFeedActivityDetails() { - return feedActivityDetails; - } - - @Override - public int compareTo(FeedActivity o) { - return o.getActivityId() - this.activityId; - } - -}