Repository: asterixdb Updated Branches: refs/heads/master 178e4f6d1 -> 5a61b2ada
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index c2d31bf..b2a7ca6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -101,12 +101,15 @@ public class FeedMetadataUtil { public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx) throws AlgebricksException { try { - String adapterName = feed.getAdapterName(); - Map<String, String> configuration = feed.getAdapterConfiguration(); - ARecordType adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME); - ARecordType metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME); + Map<String, String> configuration = feed.getConfiguration(); + ARecordType adapterOutputType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_TYPE_NAME)); + ARecordType metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName()); // Get adapter from metadata dataset <Metadata dataverse> + String adapterName = configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME); + if (adapterName == null) { + throw new AlgebricksException("cannot find adatper name"); + } DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); // Get adapter from metadata dataset <The feed dataverse> @@ -140,7 +143,7 @@ public class FeedMetadataUtil { adapterOutputType, metaType); } if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) { - metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME); + metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); if (metaType == null) { throw new AsterixException("Unknown specified feed meta output data type " + configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); @@ -150,7 +153,7 @@ public class FeedMetadataUtil { if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) { throw new AsterixException("Unspecified feed output data type"); } - adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME); + adapterOutputType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_TYPE_NAME)); if (adapterOutputType == null) { throw new AsterixException("Unknown specified feed output data type " + configuration.get(ExternalDataConstants.KEY_TYPE_NAME)); @@ -162,7 +165,7 @@ public class FeedMetadataUtil { } @SuppressWarnings("rawtypes") - public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getPrimaryFeedFactoryAndOutput(Feed feed, + public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx) throws AlgebricksException { // This method needs to be re-visited @@ -175,11 +178,11 @@ public class FeedMetadataUtil { Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null; IDataSourceAdapter.AdapterType adapterType = null; try { - adapterName = feed.getAdapterName(); - Map<String, String> configuration = feed.getAdapterConfiguration(); + Map<String, String> configuration = feed.getConfiguration(); + adapterName = configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME); configuration.putAll(policyAccessor.getFeedPolicy()); - adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME); - metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME); + adapterOutputType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_TYPE_NAME)); + metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName()); // Get adapter from metadata dataset <Metadata dataverse> adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, @@ -214,13 +217,13 @@ public class FeedMetadataUtil { adapterType = IDataSourceAdapter.AdapterType.INTERNAL; } if (metaType == null) { - metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME); + metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); } if (adapterOutputType == null) { if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) { throw new AsterixException("Unspecified feed output data type"); } - adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME); + adapterOutputType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_TYPE_NAME)); } int numOfOutputs = 1; if (metaType != null) { @@ -272,10 +275,9 @@ public class FeedMetadataUtil { } } - public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration, String key) + public static ARecordType getOutputType(IFeed feed, String fqOutputType) throws AlgebricksException { ARecordType outputType = null; - String fqOutputType = configuration.get(key); if (fqOutputType == null) { return null; @@ -291,7 +293,7 @@ public class FeedMetadataUtil { dataverseName = dataverseAndType[0]; datatypeName = dataverseAndType[1]; } else { - throw new IllegalArgumentException("Invalid value for the parameter " + key); + throw new IllegalArgumentException("Invalid parameter value " + fqOutputType); } MetadataTransactionContext ctx = null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql index f188629..a24966b 100644 --- a/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql +++ b/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql @@ -36,6 +36,8 @@ create dataset TweetsTestAdapter(TestTypedAdapterOutputType) primary key tweetid; create feed TestTypedAdapterFeed -using "testlib#test_typed_adapter" ( -("num_output_records"="5"), -("type-name"="TestTypedAdapterOutputType")); +with { + "adapter-name" : "testlib#test_typed_adapter", + "num_output_records" : "5", + "type-name" : "TestTypedAdapterOutputType" +}; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql index 5f3d322..8772ea8 100644 --- a/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql +++ b/asterixdb/asterix-server/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql @@ -44,11 +44,12 @@ create type TweetOutputType as closed { topics : {{string}} } -create feed TweetFeed -using localfs -(("type-name"="TweetInputType"), -("path"="asterix_nc1://../../../../../../asterix-app/data/twitter/obamatweets.adm"), -("format"="adm")); +create feed TweetFeed with { + "adapter-name" : "localfs", + "type-name" : "TweetInputType", + "path" : "asterix_nc1://../../../../../../asterix-app/data/twitter/obamatweets.adm", + "format" : "adm" +}; create dataset TweetsFeedIngest(TweetOutputType) primary key id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.01.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.01.ddl.aql b/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.01.ddl.aql index 190891f..e3273bc 100644 --- a/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.01.ddl.aql +++ b/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.01.ddl.aql @@ -75,7 +75,10 @@ create type typeTweet if not exists as open{ create dataset ds_tweet(typeTweet) if not exists primary key id with filter on create_at; create index text_idx if not exists on ds_tweet("text") type keyword; -create feed MessageFeed using localfs( -("path"="localhost://../../../../../src/test/resources/integrationts/restart/828.h1w.adm"), -("format"="adm"), -("type-name"="typeTweet")); + +create feed MessageFeed with { + "adapter-name" : "localfs", + "path" : "localhost://../../../../../src/test/resources/integrationts/restart/828.h1w.adm", + "format" : "adm", + "type-name" : "typeTweet" +}; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql b/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql index fe30266..67fead1 100644 --- a/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql +++ b/asterixdb/asterix-server/src/test/resources/integrationts/restart/queries/feed-restart/issue-1636/issue-1636.04.ddl.aql @@ -25,12 +25,13 @@ use dataverse twitter; drop feed TweetFeed if exists; -create feed TweetFeed using localfs -( - ("path"="localhost://../../../../../target/tweets.json"), - ("format"="adm"), - ("type-name"="typeTweet") -); +create feed TweetFeed with { + "adapter-name" : "localfs", + "path" : "localhost://../../../../../target/tweets.json", + "format" : "adm", + "type-name" : "typeTweet" +}; + set wait-for-completion-feed "true"; connect feed TweetFeed to dataset ds_tweet; start feed TweetFeed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5a61b2ad/asterixdb/asterix-server/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.2.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.2.ddl.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.2.ddl.aql index 2c74ac7..98b9845 100644 --- a/asterixdb/asterix-server/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.2.ddl.aql +++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.2.ddl.aql @@ -42,15 +42,16 @@ lockTime:int32 create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta()."key"; -create feed KVChangeStream using adapter( - ("type-name"="DocumentType"), - ("meta-type-name"="KVMetaType"), - ("reader"="org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory"), - ("parser"="record-with-metadata"), - ("format"="dcp"), - ("record-format"="json"), - ("change-feed"="true"), - ("key-indexes"="0"), - ("key-indicators"="1"), - ("num-of-records"="1000") -); \ No newline at end of file +create feed KVChangeStream with { + "adapter-name" : "adapter", + "type-name" : "DocumentType", + "meta-type-name" : "KVMetaType", + "reader" : "org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory", + "parser" : "record-with-metadata", + "format" : "dcp", + "record-format" : "json", + "change-feed" : "true", + "key-indexes" : "0", + "key-indicators" : "1", + "num-of-records" : "1000" +}; \ No newline at end of file