http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java index 78a7d6f..01ade10 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java @@ -35,6 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -53,28 +54,34 @@ public class MetadataCache { // Default life time period of a temp dataset. It is 30 days. private final static long TEMP_DATASET_INACTIVE_TIME_THRESHOLD = 3600 * 24 * 30 * 1000L; // Key is dataverse name. - protected final Map<String, Dataverse> dataverses = new HashMap<String, Dataverse>(); + protected final Map<String, Dataverse> dataverses = new HashMap<>(); // Key is dataverse name. Key of value map is dataset name. - protected final Map<String, Map<String, Dataset>> datasets = new HashMap<String, Map<String, Dataset>>(); + protected final Map<String, Map<String, Dataset>> datasets = new HashMap<>(); // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name. - protected final Map<String, Map<String, Map<String, Index>>> indexes = new HashMap<String, Map<String, Map<String, Index>>>(); + protected final Map<String, Map<String, Map<String, Index>>> indexes = + new HashMap<>(); // Key is dataverse name. Key of value map is datatype name. - protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<String, Map<String, Datatype>>(); + protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<>(); // Key is dataverse name. - protected final Map<String, NodeGroup> nodeGroups = new HashMap<String, NodeGroup>(); + protected final Map<String, NodeGroup> nodeGroups = new HashMap<>(); // Key is function Identifier . Key of value map is function name. - protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>(); + protected final Map<FunctionSignature, Function> functions = new HashMap<>(); // Key is adapter dataverse. Key of value map is the adapter name - protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>(); + protected final Map<String, Map<String, DatasourceAdapter>> adapters = + new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<String, Map<String, FeedPolicyEntity>>(); + protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = + new HashMap<>(); // Key is library dataverse. Key of value map is the library name - protected final Map<String, Map<String, Library>> libraries = new HashMap<String, Map<String, Library>>(); + protected final Map<String, Map<String, Library>> libraries = new HashMap<>(); // Key is library dataverse. Key of value map is the feed name - protected final Map<String, Map<String, Feed>> feeds = new HashMap<String, Map<String, Feed>>(); + protected final Map<String, Map<String, Feed>> feeds = new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = new HashMap<String, Map<String, CompactionPolicy>>(); + protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = + new HashMap<>(); + // Key is DataverseName, Key of value map is feedConnectionId + protected final Map<String, Map<String, FeedConnection>> feedConnections = new HashMap<>(); // Atomically executes all metadata operations in ctx's log. public void commit(MetadataTransactionContext ctx) { @@ -163,7 +170,7 @@ public class MetadataCache { Map<String, Dataset> m = datasets.get(dataset.getDataverseName()); if (m == null) { - m = new HashMap<String, Dataset>(); + m = new HashMap<>(); datasets.put(dataset.getDataverseName(), m); } if (!m.containsKey(dataset.getDatasetName())) { @@ -184,7 +191,7 @@ public class MetadataCache { synchronized (datatypes) { Map<String, Datatype> m = datatypes.get(datatype.getDataverseName()); if (m == null) { - m = new HashMap<String, Datatype>(); + m = new HashMap<>(); datatypes.put(datatype.getDataverseName(), m); } if (!m.containsKey(datatype.getDatatypeName())) { @@ -207,7 +214,7 @@ public class MetadataCache { synchronized (compactionPolicy) { Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName()); if (p == null) { - p = new HashMap<String, CompactionPolicy>(); + p = new HashMap<>(); p.put(compactionPolicy.getPolicyName(), compactionPolicy); compactionPolicies.put(compactionPolicy.getDataverseName(), p); } else { @@ -244,7 +251,8 @@ public class MetadataCache { datatypes.remove(dataverse.getDataverseName()); adapters.remove(dataverse.getDataverseName()); compactionPolicies.remove(dataverse.getDataverseName()); - List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>(); + List<FunctionSignature> markedFunctionsForRemoval = + new ArrayList<>(); for (FunctionSignature signature : functions.keySet()) { if (signature.getNamespace().equals(dataverse.getDataverseName())) { markedFunctionsForRemoval.add(signature); @@ -371,7 +379,7 @@ public class MetadataCache { } public List<Dataset> getDataverseDatasets(String dataverseName) { - List<Dataset> retDatasets = new ArrayList<Dataset>(); + List<Dataset> retDatasets = new ArrayList<>(); synchronized (datasets) { Map<String, Dataset> m = datasets.get(dataverseName); if (m == null) { @@ -385,7 +393,7 @@ public class MetadataCache { } public List<Index> getDatasetIndexes(String dataverseName, String datasetName) { - List<Index> retIndexes = new ArrayList<Index>(); + List<Index> retIndexes = new ArrayList<>(); synchronized (datasets) { Map<String, Index> map = indexes.get(dataverseName).get(datasetName); if (map == null) { @@ -398,28 +406,13 @@ public class MetadataCache { } } - /** - * Represents a logical operation against the metadata. - */ - protected class MetadataLogicalOperation { - // Entity to be added/dropped. - public final IMetadataEntity<?> entity; - // True for add, false for drop. - public final boolean isAdd; - - public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) { - this.entity = entity; - this.isAdd = isAdd; - } - }; - protected void doOperation(MetadataLogicalOperation op) { if (op.isAdd) { op.entity.addToCache(this); } else { op.entity.dropFromCache(this); } - } + }; protected void undoOperation(MetadataLogicalOperation op) { if (!op.isAdd) { @@ -431,8 +424,8 @@ public class MetadataCache { public Function addFunctionIfNotExists(Function function) { synchronized (functions) { - FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(), - function.getArity()); + FunctionSignature signature = + new FunctionSignature(function.getDataverseName(), function.getName(), function.getArity()); Function fun = functions.get(signature); if (fun == null) { return functions.put(signature, function); @@ -443,8 +436,8 @@ public class MetadataCache { public Function dropFunction(Function function) { synchronized (functions) { - FunctionSignature signature = new FunctionSignature(function.getDataverseName(), function.getName(), - function.getArity()); + FunctionSignature signature = + new FunctionSignature(function.getDataverseName(), function.getName(), function.getArity()); Function fun = functions.get(signature); if (fun == null) { return null; @@ -457,7 +450,7 @@ public class MetadataCache { synchronized (feedPolicy) { Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName()); if (p == null) { - p = new HashMap<String, FeedPolicyEntity>(); + p = new HashMap<>(); p.put(feedPolicy.getPolicyName(), feedPolicy); feedPolicies.put(feedPolicy.getDataverseName(), p); } else { @@ -481,10 +474,10 @@ public class MetadataCache { public DatasourceAdapter addAdapterIfNotExists(DatasourceAdapter adapter) { synchronized (adapters) { - Map<String, DatasourceAdapter> adaptersInDataverse = adapters - .get(adapter.getAdapterIdentifier().getNamespace()); + Map<String, DatasourceAdapter> adaptersInDataverse = + adapters.get(adapter.getAdapterIdentifier().getNamespace()); if (adaptersInDataverse == null) { - adaptersInDataverse = new HashMap<String, DatasourceAdapter>(); + adaptersInDataverse = new HashMap<>(); adapters.put(adapter.getAdapterIdentifier().getNamespace(), adaptersInDataverse); } DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getName()); @@ -497,8 +490,8 @@ public class MetadataCache { public DatasourceAdapter dropAdapter(DatasourceAdapter adapter) { synchronized (adapters) { - Map<String, DatasourceAdapter> adaptersInDataverse = adapters - .get(adapter.getAdapterIdentifier().getNamespace()); + Map<String, DatasourceAdapter> adaptersInDataverse = + adapters.get(adapter.getAdapterIdentifier().getNamespace()); if (adaptersInDataverse != null) { return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName()); } @@ -512,7 +505,7 @@ public class MetadataCache { boolean needToAddd = (libsInDataverse == null || libsInDataverse.get(library.getName()) != null); if (needToAddd) { if (libsInDataverse == null) { - libsInDataverse = new HashMap<String, Library>(); + libsInDataverse = new HashMap<>(); libraries.put(library.getDataverseName(), libsInDataverse); } return libsInDataverse.put(library.getDataverseName(), library); @@ -531,8 +524,37 @@ public class MetadataCache { } } + public FeedConnection addFeedConnectionIfNotExists(FeedConnection feedConnection) { + synchronized (feedConnections) { + Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName()); + if (feedConnsInDataverse == null) { + feedConnections.put(feedConnection.getDataverseName(), new HashMap<>()); + feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName()); + } + return feedConnsInDataverse.put(feedConnection.getConnectionId(), feedConnection); + } + } + + public FeedConnection dropFeedConnection(FeedConnection feedConnection) { + synchronized (feedConnections) { + Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName()); + if (feedConnsInDataverse != null) { + return feedConnsInDataverse.remove(feedConnection.getConnectionId()); + } else { + return null; + } + } + } + public Feed addFeedIfNotExists(Feed feed) { - return null; + synchronized (feeds) { + Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName()); + if (feedsInDataverse == null) { + feeds.put(feed.getDataverseName(), new HashMap<>()); + feedsInDataverse = feeds.get(feed.getDataverseName()); + } + return feedsInDataverse.put(feed.getFeedName(), feed); + } } public Feed dropFeed(Feed feed) { @@ -548,12 +570,12 @@ public class MetadataCache { private Index addIndexIfNotExistsInternal(Index index) { Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName()); if (datasetMap == null) { - datasetMap = new HashMap<String, Map<String, Index>>(); + datasetMap = new HashMap<>(); indexes.put(index.getDataverseName(), datasetMap); } Map<String, Index> indexMap = datasetMap.get(index.getDatasetName()); if (indexMap == null) { - indexMap = new HashMap<String, Index>(); + indexMap = new HashMap<>(); datasetMap.put(index.getDatasetName(), indexMap); } if (!indexMap.containsKey(index.getIndexName())) { @@ -583,4 +605,19 @@ public class MetadataCache { } } } + + /** + * Represents a logical operation against the metadata. + */ + protected class MetadataLogicalOperation { + // Entity to be added/dropped. + public final IMetadataEntity<?> entity; + // True for add, false for drop. + public final boolean isAdd; + + public MetadataLogicalOperation(IMetadataEntity<?> entity, boolean isAdd) { + this.entity = entity; + this.isAdd = isAdd; + } + } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 59911d1..5d44d0b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -42,6 +42,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -779,10 +780,16 @@ public class MetadataManager implements IMetadataManager { @Override public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException { - Feed feed; + Feed feed = null; + List<FeedConnection> feedConnections = null; try { feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName); + feedConnections = metadataNode.getFeedConnections(ctx.getJobId(), dataverse, feedName); metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName); + for (FeedConnection feedConnection : feedConnections) { + metadataNode.dropFeedConnection(ctx.getJobId(), dataverse, feedName, feedConnection.getDatasetName()); + ctx.dropFeedConnection(dataverse, feedName, feedConnection.getDatasetName()); + } } catch (RemoteException e) { throw new MetadataException(e); } @@ -800,6 +807,48 @@ public class MetadataManager implements IMetadataManager { } @Override + public void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) + throws MetadataException { + try { + metadataNode.addFeedConnection(ctx.getJobId(), feedConnection); + } catch (RemoteException e) { + throw new MetadataException(e); + } + ctx.addFeedConnection(feedConnection); + } + + @Override + public void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, + String datasetName) throws MetadataException { + try { + metadataNode.dropFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName); + } catch (RemoteException e) { + throw new MetadataException(e); + } + ctx.dropFeedConnection(dataverseName, feedName, datasetName); + } + + @Override + public FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, + String datasetName) throws MetadataException { + try { + return metadataNode.getFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName); + } catch (RemoteException e) { + throw new MetadataException(e); + } + } + + @Override + public List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName) + throws MetadataException { + try { + return metadataNode.getFeedConnections(ctx.getJobId(), dataverseName, feedName); + } catch (RemoteException e) { + throw new MetadataException(e); + } + } + + @Override public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse) throws MetadataException { List<DatasourceAdapter> dataverseAdapters; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 8ecc0ed..51790e6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -60,6 +60,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -73,6 +74,7 @@ import org.apache.asterix.metadata.entitytupletranslators.DatasourceAdapterTuple import org.apache.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator; import org.apache.asterix.metadata.entitytupletranslators.DataverseTupleTranslator; import org.apache.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator; +import org.apache.asterix.metadata.entitytupletranslators.FeedConnectionTupleTranslator; import org.apache.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator; import org.apache.asterix.metadata.entitytupletranslators.FeedTupleTranslator; import org.apache.asterix.metadata.entitytupletranslators.FunctionTupleTranslator; @@ -487,11 +489,16 @@ public class MetadataNode implements IMetadataNode { } List<Feed> dataverseFeeds; + List<FeedConnection> feedConnections; Feed feed; dataverseFeeds = getDataverseFeeds(jobId, dataverseName); - // Drop all datasets in this dataverse. + // Drop all feeds&connections in this dataverse. for (int i = 0; i < dataverseFeeds.size(); i++) { feed = dataverseFeeds.get(i); + feedConnections = getFeedConnections(jobId, dataverseName, feed.getFeedName()); + for (FeedConnection feedConnection : feedConnections) { + dropFeedConnection(jobId, dataverseName, feed.getFeedName(), feedConnection.getDatasetName()); + } dropFeed(jobId, dataverseName, feed.getFeedName()); } @@ -1480,6 +1487,63 @@ public class MetadataNode implements IMetadataNode { } @Override + public void addFeedConnection(JobId jobId, FeedConnection feedConnection) throws MetadataException { + try { + FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(true); + ITupleReference feedConnTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedConnection); + insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, feedConnTuple); + } catch (IndexException | ACIDException | IOException e) { + throw new MetadataException(e); + } + } + + @Override + public List<FeedConnection> getFeedConnections(JobId jobId, String dataverseName, String feedName) + throws MetadataException { + try { + ITupleReference searchKey = createTuple(dataverseName, feedName); + FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(false); + List<FeedConnection> results = new ArrayList<>(); + IValueExtractor<FeedConnection> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); + searchIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey, valueExtractor, results); + return results; + } catch (IndexException | IOException e) { + throw new MetadataException(e); + } + } + + @Override + public FeedConnection getFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + throws MetadataException { + try { + ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName); + FeedConnectionTupleTranslator tupleReaderWriter = new FeedConnectionTupleTranslator(false); + List<FeedConnection> results = new ArrayList<>(); + IValueExtractor<FeedConnection> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); + searchIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey, valueExtractor, results); + if (!results.isEmpty()) { + return results.get(0); + } + return null; + } catch (IndexException | IOException e) { + throw new MetadataException(e); + } + } + + @Override + public void dropFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + throws MetadataException { + try { + ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName); + ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, + searchKey); + deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, tuple); + } catch (IndexException | IOException | ACIDException e) { + throw new MetadataException(e); + } + } + + @Override public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException { try { // Insert into the 'Feed' dataset. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java index 87c1c473..b2ec7f2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java @@ -24,14 +24,13 @@ import java.util.ArrayList; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; -import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.api.IFeed.FeedType; import org.apache.asterix.metadata.entities.CompactionPolicy; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -231,17 +230,26 @@ public class MetadataTransactionContext extends MetadataCache { public void addFeed(Feed feed) { droppedCache.dropFeed(feed); logAndApply(new MetadataLogicalOperation(feed, true)); - } - public void dropFeed(String dataverseName, String feedName, IFeed.FeedType feedType) { + public void dropFeed(String dataverseName, String feedName) { Feed feed = null; - feed = new Feed(dataverseName, feedName, null, feedType, (feedType == FeedType.PRIMARY) ? feedName : null, - null, null); + feed = new Feed(dataverseName, feedName, null, null); droppedCache.addFeedIfNotExists(feed); logAndApply(new MetadataLogicalOperation(feed, false)); } + public void addFeedConnection(FeedConnection feedConnection) { + droppedCache.dropFeedConnection(feedConnection); + logAndApply(new MetadataLogicalOperation(feedConnection, true)); + } + + public void dropFeedConnection(String dataverseName, String feedName, String datasetName) { + FeedConnection feedConnection = new FeedConnection(dataverseName, feedName, datasetName, null, null, null); + droppedCache.addFeedConnectionIfNotExists(feedConnection); + logAndApply(new MetadataLogicalOperation(feedConnection, false)); + } + @Override public void clear() { super.clear(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index feb4db0..bd1c7d1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -34,6 +34,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -431,7 +432,6 @@ public interface IMetadataManager extends IMetadataBootstrap { void dropAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException; /** - * * @param ctx * MetadataTransactionContext of an active metadata transaction. * @param dataverseName @@ -691,4 +691,19 @@ public interface IMetadataManager extends IMetadataBootstrap { * rebind it */ void rebindMetadataNode(); + + /** + * Feed Connection Related Metadata operations + */ + void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) + throws MetadataException; + + void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, + String datasetName) throws MetadataException; + + FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, + String datasetName) throws MetadataException; + + List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String dataverseName, String feedName) + throws MetadataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java index 41d0b6a..21b170d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java @@ -35,6 +35,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; @@ -764,4 +765,14 @@ public interface IMetadataNode extends Remote, Serializable { <T extends IExtensionMetadataEntity> List<T> getEntities(JobId jobId, IExtensionMetadataSearchKey searchKey) throws MetadataException, RemoteException; + void addFeedConnection(JobId jobId, FeedConnection feedConnection) throws MetadataException, RemoteException; + + FeedConnection getFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + throws MetadataException, RemoteException; + + void dropFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + throws MetadataException, RemoteException; + + List<FeedConnection> getFeedConnections(JobId jobId, String dataverseName, String feedName) + throws MetadataException, RemoteException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 6cd1f8b..02a092d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -123,7 +123,7 @@ public class MetadataBootstrap { MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET, MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, - MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET }; + MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET }; private MetadataBootstrap() { } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java index 833f3e5..d2a4b1c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java @@ -53,8 +53,8 @@ public class MetadataPrimaryIndexes { new MetadataIndexImmutableProperties("Library", 9, 9); public static final MetadataIndexImmutableProperties PROPERTIES_FEED = new MetadataIndexImmutableProperties("Feed", 10, 10); - public static final MetadataIndexImmutableProperties PROPERTIES_FEED_ACTIVITY_DATASET_ID = - new MetadataIndexImmutableProperties("FeedActivity", 11, 11); + public static final MetadataIndexImmutableProperties PROPERTIES_FEED_CONNECTION = + new MetadataIndexImmutableProperties("FeedConnection", 11, 11); public static final MetadataIndexImmutableProperties PROPERTIES_FEED_POLICY = new MetadataIndexImmutableProperties("FeedPolicy", 12, 12); public static final MetadataIndexImmutableProperties PROPERTIES_COMPACTION_POLICY = @@ -129,6 +129,13 @@ public class MetadataPrimaryIndexes { Arrays.asList(MetadataRecordTypes.FIELD_NAME_FILE_NUMBER)), 0, MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, true, new int[] { 0, 1, 2 }); + public static final IMetadataIndex FEED_CONNECTION_DATASET = new MetadataIndex(PROPERTIES_FEED_CONNECTION, 4, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, + Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME), + Arrays.asList(MetadataRecordTypes.FIELD_NAME_FEED_NAME), + Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATASET_NAME)), + 0, MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE, true, new int[] { 0, 1, 2 }); + private MetadataPrimaryIndexes() { } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java index a783c63..2a04b58 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java @@ -61,7 +61,6 @@ public final class MetadataRecordTypes { public static final String FIELD_NAME_FILE_NUMBER = "FileNumber"; public static final String FIELD_NAME_FILE_SIZE = "FileSize"; public static final String FIELD_NAME_FILE_STRUCTURE = "FileStructure"; - public static final String FIELD_NAME_FUNCTION = "Function"; public static final String FIELD_NAME_GROUP_NAME = "GroupName"; public static final String FIELD_NAME_HINTS = "Hints"; public static final String FIELD_NAME_INDEX_NAME = "IndexName"; @@ -87,13 +86,10 @@ public final class MetadataRecordTypes { public static final String FIELD_NAME_PENDING_OP = "PendingOp"; public static final String FIELD_NAME_POLICY_NAME = "PolicyName"; public static final String FIELD_NAME_PRIMARY_KEY = "PrimaryKey"; - public static final String FIELD_NAME_PRIMARY_TYPE_DETAILS = "PrimaryTypeDetails"; public static final String FIELD_NAME_PROPERTIES = "Properties"; public static final String FIELD_NAME_RECORD = "Record"; public static final String FIELD_NAME_RETURN_TYPE = "ReturnType"; public static final String FIELD_NAME_SEARCH_KEY = "SearchKey"; - public static final String FIELD_NAME_SECONDARY_TYPE_DETAILS = "SecondaryTypeDetails"; - public static final String FIELD_NAME_SOURCE_FEED_NAME = "SourceFeedName"; public static final String FIELD_NAME_STATUS = "Status"; public static final String FIELD_NAME_TAG = "Tag"; public static final String FIELD_NAME_TIMESTAMP = "Timestamp"; @@ -102,6 +98,7 @@ public final class MetadataRecordTypes { public static final String FIELD_NAME_UNORDERED_LIST = "UnorderedList"; public static final String FIELD_NAME_VALUE = "Value"; public static final String FIELD_NAME_WORKING_MEMORY_SIZE = "WorkingMemorySize"; + public static final String FIELD_NAME_APPLIED_FUNCTIONS = "AppliedFunctions"; //---------------------------------- Record Types Creation ----------------------------------// //--------------------------------------- Properties ----------------------------------------// @@ -148,35 +145,6 @@ public final class MetadataRecordTypes { BuiltinType.ADATETIME, BuiltinType.AINT32 }, //IsOpen? true); - //-------------------------------------- Feed Details ---------------------------------------// - public static final int FEED_DETAILS_ARECORD_FILESTRUCTURE_FIELD_INDEX = 0; - public static final int FEED_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX = 1; - public static final int FEED_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX = 2; - public static final int FEED_DETAILS_ARECORD_PRIMARYKEY_FIELD_INDEX = 3; - public static final int FEED_DETAILS_ARECORD_GROUPNAME_FIELD_INDEX = 4; - public static final int FEED_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX = 5; - public static final int FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX = 6; - public static final int FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX = 7; - public static final int FEED_DETAILS_ARECORD_STATE_FIELD_INDEX = 8; - public static final int FEED_DETAILS_ARECORD_COMPACTION_POLICY_FIELD_INDEX = 9; - public static final int FEED_DETAILS_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX = 10; - public static final ARecordType FEED_DETAILS_RECORDTYPE = createRecordType( - // RecordTypeName - null, - // FieldNames - new String[] { FIELD_NAME_FILE_STRUCTURE, FIELD_NAME_PARTITIONING_STRATEGY, FIELD_NAME_PARTITIONING_KEY, - FIELD_NAME_PRIMARY_KEY, FIELD_NAME_GROUP_NAME, FIELD_NAME_DATASOURCE_ADAPTER, FIELD_NAME_PROPERTIES, - FIELD_NAME_FUNCTION, FIELD_NAME_STATUS, FIELD_NAME_COMPACTION_POLICY, - FIELD_NAME_COMPACTION_POLICY_PROPERTIES }, - // FieldTypes - new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, new AOrderedListType(BuiltinType.ASTRING, null), - new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING, - new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null), - AUnionType.createUnknownableType(BuiltinType.ASTRING), BuiltinType.ASTRING, BuiltinType.ASTRING, - new AOrderedListType(COMPACTION_POLICY_PROPERTIES_RECORDTYPE, null) }, - //IsOpen? - true); - //---------------------------------------- Dataset ------------------------------------------// public static final String RECORD_NAME_DATASET = "DatasetRecordType"; public static final int DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX = 0; @@ -383,60 +351,45 @@ public final class MetadataRecordTypes { BuiltinType.ASTRING }, //IsOpen? true); - //---------------------------------- Primary Feed Details -----------------------------------// - public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0; - public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1; - public static final ARecordType PRIMARY_FEED_DETAILS_RECORDTYPE = createRecordType( - // RecordTypeName - null, - // FieldNames - new String[] { FIELD_NAME_ADAPTER_NAME, FIELD_NAME_ADAPTER_CONFIGURATION }, - // FieldTypes - new IAType[] { BuiltinType.ASTRING, - new AUnorderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null) }, - //IsOpen? - true); - //--------------------------------- Secondary Feed Details ----------------------------------// - public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0; - public static final ARecordType SECONDARY_FEED_DETAILS_RECORDTYPE = createRecordType( - // RecordTypeName - null, - // FieldNames - new String[] { FIELD_NAME_SOURCE_FEED_NAME }, - // FieldTypes - new IAType[] { BuiltinType.ASTRING }, - //IsOpen? - true); - //---------------------------------------- Feed Activity ------------------------------------// + + //---------------------------------------- Feed Details ------------------------------------// public static final String RECORD_NAME_FEED = "FeedRecordType"; - public static final int FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0; - public static final int FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX = 1; - public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 2; - public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX = 3; - public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 4; - public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5; - public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6; public static final int FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0; public static final int FEED_ARECORD_FEED_NAME_FIELD_INDEX = 1; - public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 2; - public static final int FEED_ARECORD_FEED_TYPE_FIELD_INDEX = 3; - public static final int FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX = 4; - public static final int FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX = 5; - public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 6; - public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX = 0; - public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX = 1; - public static final int FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX = 0; + public static final int FEED_ARECORD_ADAPTOR_NAME_INDEX = 2; + public static final int FEED_ARECORD_ADAPTOR_CONFIG_INDEX = 3; + public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 4; public static final ARecordType FEED_RECORDTYPE = createRecordType( // RecordTypeName RECORD_NAME_FEED, // FieldNames - new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_FUNCTION, FIELD_NAME_FEED_TYPE, - FIELD_NAME_PRIMARY_TYPE_DETAILS, FIELD_NAME_SECONDARY_TYPE_DETAILS, FIELD_NAME_TIMESTAMP }, + new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_ADAPTER_NAME, + FIELD_NAME_ADAPTER_CONFIGURATION, FIELD_NAME_TIMESTAMP }, // FieldTypes - new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, - AUnionType.createUnknownableType(BuiltinType.ASTRING), BuiltinType.ASTRING, - AUnionType.createUnknownableType(PRIMARY_FEED_DETAILS_RECORDTYPE), - AUnionType.createUnknownableType(SECONDARY_FEED_DETAILS_RECORDTYPE), BuiltinType.ASTRING }, + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, + new AUnorderedListType(FEED_ADAPTER_CONFIGURATION_RECORDTYPE, null), BuiltinType.ASTRING }, + //IsOpen? + true); + + //------------------------------------- Feed Connection ---------------------------------------// + public static final String RECORD_NAME_FEED_CONNECTION = "FeedConnectionRecordType"; + public static final int FEED_CONN_DATAVERSE_NAME_FIELD_INDEX = 0; + public static final int FEED_CONN_FEED_NAME_FIELD_INDEX = 1; + public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2; + public static final int FEED_CONN_OUTPUT_TYPE_INDEX = 3; + public static final int FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX = 4; + public static final int FEED_CONN_POLICY_FIELD_INDEX = 5; + + + public static final ARecordType FEED_CONNECTION_RECORDTYPE = createRecordType( + // RecordTypeName + RECORD_NAME_FEED_CONNECTION, + // FieldNames + new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_FEED_NAME, FIELD_NAME_DATASET_NAME, + FIELD_NAME_RETURN_TYPE, FIELD_NAME_APPLIED_FUNCTIONS, FIELD_NAME_POLICY_NAME }, + // FieldTypes + new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, + new AUnorderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING}, //IsOpen? true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index 0d3d06d..26cec1e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -28,6 +28,7 @@ import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.asterix.om.types.ARecordType; @@ -54,29 +55,29 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { private final Feed feed; private final EntityId sourceFeedId; - private final IFeed.FeedType sourceFeedType; private final FeedRuntimeType location; private final String targetDataset; private final String[] locations; private final int computeCardinality; private final List<IAType> pkTypes; private final List<ScalarFunctionCallExpression> keyAccessExpression; + private final FeedConnection feedConnection; public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType, List<IAType> pkTypes, List<List<String>> partitioningKeys, List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, - IFeed.FeedType sourceFeedType, FeedRuntimeType location, String[] locations, INodeDomain domain) + FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException { super(id, itemType, metaType, Type.FEED, domain); this.feed = feed; this.targetDataset = targetDataset; this.sourceFeedId = sourceFeedId; - this.sourceFeedType = sourceFeedType; this.location = location; this.locations = locations; this.pkTypes = pkTypes; this.keyAccessExpression = keyAccessExpression; this.computeCardinality = ClusterStateManager.INSTANCE.getParticipantNodes().size(); + this.feedConnection = feedConnection; initFeedDataSource(); } @@ -120,10 +121,6 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { } } - public IFeed.FeedType getSourceFeedType() { - return sourceFeedType; - } - public int getComputeCardinality() { return computeCardinality; } @@ -196,7 +193,7 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset()); FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, - getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation()); + feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation()); return new Pair<>(feedCollector, new AlgebricksAbsolutePartitionConstraint(getLocations())); @@ -209,4 +206,8 @@ public class FeedDataSource extends DataSource implements IMutationDataSource { public boolean isScanAccessPathALeaf() { return true; } + + public FeedConnection getFeedConnection() { + return feedConnection; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java index f1a90c7..b647bb7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java @@ -29,6 +29,7 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.NodeGroup; @@ -132,6 +133,15 @@ public class MetadataManagerUtil { } } + public static FeedConnection findFeedConnection(MetadataTransactionContext mdTxnCtx, String dataverse, + String feedName, String datasetName) throws AlgebricksException { + try { + return MetadataManager.INSTANCE.getFeedConnection(mdTxnCtx, dataverse, feedName, datasetName); + } catch (MetadataException e) { + throw new AlgebricksException(e); + } + } + public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverse, String policyName) throws AlgebricksException { try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 1beaed0..f5c6d9a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -68,6 +68,7 @@ import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; @@ -329,6 +330,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName); } + public FeedConnection findFeedConnection(String dataverseName, String feedName, String datasetName) + throws AlgebricksException { + return MetadataManagerUtil.findFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName); + } + public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException { return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index d55fde5..2e328f9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -29,6 +29,7 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveJobNotificationHandler; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -93,7 +94,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; /** * Metadata describing a dataset. */ -public class Dataset implements IMetadataEntity<Dataset> { +public class Dataset implements IMetadataEntity<Dataset>, IDataset { /* * Constants @@ -278,7 +279,9 @@ public class Dataset implements IMetadataEntity<Dataset> { // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - if (listener.isEntityActive() && listener.isEntityUsingDataset(dataverseName, datasetName)) { + IDataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), + dataverseName, datasetName); + if (listener.isEntityUsingDataset(ds)) { throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, RecordUtil.toFullyQualifiedName(dataverseName, datasetName), listener.getEntityId().toString()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java index 1343e53..ea0e4eb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java @@ -22,7 +22,6 @@ package org.apache.asterix.metadata.entities; import java.util.Map; import org.apache.asterix.active.EntityId; -import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.api.IMetadataEntity; @@ -36,28 +35,18 @@ public class Feed implements IMetadataEntity<Feed>, IFeed { /** A unique identifier for the feed */ private EntityId feedId; - /** The function that is to be applied on each incoming feed tuple **/ - private FunctionSignature appliedFunction; - /** The type {@code FeedType} associated with the feed. **/ - private IFeed.FeedType feedType; /** A string representation of the instance **/ private String displayName; /** A string representation of the adapter name **/ private String adapterName; /** Adapter configuration */ private Map<String, String> adapterConfiguration; - /** Source primary feed */ - private String sourceFeedName; - public Feed(String dataverseName, String feedName, FunctionSignature appliedFunction, IFeed.FeedType feedType, - String sourceFeedName, String adapterName, Map<String, String> configuration) { + public Feed(String dataverseName, String feedName,String adapterName, Map<String, String> configuration) { this.feedId = new EntityId(EXTENSION_NAME, dataverseName, feedName); - this.appliedFunction = appliedFunction; - this.feedType = feedType; - this.displayName = feedType + "(" + feedId + ")"; + this.displayName = "(" + feedId + ")"; this.adapterName = adapterName; this.adapterConfiguration = configuration; - this.sourceFeedName = sourceFeedName; } @Override @@ -76,16 +65,6 @@ public class Feed implements IMetadataEntity<Feed>, IFeed { } @Override - public FunctionSignature getAppliedFunction() { - return appliedFunction; - } - - @Override - public IFeed.FeedType getFeedType() { - return feedType; - } - - @Override public boolean equals(Object other) { if (this == other) { return true; @@ -104,7 +83,7 @@ public class Feed implements IMetadataEntity<Feed>, IFeed { @Override public String toString() { - return feedType + "(" + feedId + ")"; + return feedId.toString(); } @Override @@ -126,8 +105,4 @@ public class Feed implements IMetadataEntity<Feed>, IFeed { public Map<String, String> getAdapterConfiguration() { return adapterConfiguration; } - - public String getSourceFeedName() { - return sourceFeedName; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java new file mode 100644 index 0000000..b05e61e --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.metadata.entities; + +import org.apache.asterix.active.EntityId; +import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.metadata.MetadataCache; +import org.apache.asterix.metadata.api.IMetadataEntity; +import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; + +import java.util.ArrayList; +import java.util.List; + +/** + * Feed connection records the feed --> dataset mapping. + */ +public class FeedConnection implements IMetadataEntity<FeedConnection> { + + private static final long serialVersionUID = 1L; + + private EntityId feedId; + private String connectionId; + private String dataverseName; + private String feedName; + private String datasetName; + private String policyName; + private String outputType; + private List<FunctionSignature> appliedFunctions; + + public FeedConnection(String dataverseName, String feedName, String datasetName, + List<FunctionSignature> appliedFunctions, String policyName, String outputType) { + this.dataverseName = dataverseName; + this.feedName = feedName; + this.datasetName = datasetName; + this.appliedFunctions = appliedFunctions; + this.connectionId = feedName + ":" + datasetName; + this.policyName = policyName; + this.outputType = outputType; + this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName); + } + + public List<FunctionSignature> getAppliedFunctions() { + return appliedFunctions; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof FeedConnection)) { + return false; + } + return ((FeedConnection) other).getConnectionId().equals(connectionId); + } + + @Override + public int hashCode() { + return connectionId.hashCode(); + } + + @Override + public FeedConnection addToCache(MetadataCache cache) { + return null; + } + + @Override + public FeedConnection dropFromCache(MetadataCache cache) { + return null; + } + + public String getDataverseName() { + return dataverseName; + } + + public String getDatasetName() { + return datasetName; + } + + public String getConnectionId() { + return connectionId; + } + + public String getFeedName() { + return feedName; + } + + public String getPolicyName() { + return policyName; + } + + public String getOutputType() { + return outputType; + } + + public EntityId getFeedId() { + return feedId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java new file mode 100644 index 0000000..e7fe5b4 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.metadata.entitytupletranslators; + +import org.apache.asterix.builders.IARecordBuilder; +import org.apache.asterix.builders.UnorderedListBuilder; +import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; +import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.om.base.*; +import org.apache.asterix.om.types.AUnorderedListType; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedConnection> { + + public static final int FEED_CONN_DATAVERSE_NAME_FIELD_INDEX = 0; + public static final int FEED_CONN_FEED_NAME_FIELD_INDEX = 1; + public static final int FEED_CONN_DATASET_NAME_FIELD_INDEX = 2; + + public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3; + + private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE); + + public FeedConnectionTupleTranslator(boolean getTuple) { + super(getTuple, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET.getFieldCount()); + } + + @Override + public FeedConnection getMetadataEntityFromTuple(ITupleReference frameTuple) throws MetadataException, IOException { + byte[] serRecord = frameTuple.getFieldData(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX); + int recordStartOffset = frameTuple.getFieldStart(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX); + int recordLength = frameTuple.getFieldLength(FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX); + ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength); + DataInput in = new DataInputStream(stream); + ARecord feedConnRecord = recordSerDes.deserialize(in); + return createFeedConnFromRecord(feedConnRecord); + } + + private FeedConnection createFeedConnFromRecord(ARecord feedConnRecord) { + String dataverseName = ((AString) feedConnRecord + .getValueByPos(MetadataRecordTypes.FEED_CONN_DATAVERSE_NAME_FIELD_INDEX)).getStringValue(); + String feedName = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_FEED_NAME_FIELD_INDEX)) + .getStringValue(); + String datasetName = ((AString) feedConnRecord + .getValueByPos(MetadataRecordTypes.FEED_CONN_DATASET_NAME_FIELD_INDEX)).getStringValue(); + String outputType = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX)) + .getStringValue(); + String policyName = ((AString) feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX)) + .getStringValue(); + ArrayList<FunctionSignature> appliedFunctions = null; + Object o = feedConnRecord.getValueByPos(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX); + IACursor cursor; + + if (!(o instanceof ANull) && !(o instanceof AMissing)) { + appliedFunctions = new ArrayList<>(); + FunctionSignature functionSignature; + cursor = ((AUnorderedList) feedConnRecord + .getValueByPos(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX)).getCursor(); + while (cursor.next()) { + //TODO: allow different arity + functionSignature = new FunctionSignature(dataverseName, ((AString) cursor.get()).getStringValue(), 1); + appliedFunctions.add(functionSignature); + } + } + + return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, outputType); + } + + @Override + public ITupleReference getTupleFromMetadataEntity(FeedConnection me) throws MetadataException, IOException { + tupleBuilder.reset(); + + // key: dataverse + aString.setValue(me.getDataverseName()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + + // key: feedName + aString.setValue(me.getFeedName()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + + // key: dataset + aString.setValue(me.getDatasetName()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + + recordBuilder.reset(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE); + // field dataverse + fieldValue.reset(); + aString.setValue(me.getDataverseName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.FEED_CONN_DATAVERSE_NAME_FIELD_INDEX, fieldValue); + + // field: feedId + fieldValue.reset(); + aString.setValue(me.getFeedName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.FEED_CONN_FEED_NAME_FIELD_INDEX, fieldValue); + + // field: dataset + fieldValue.reset(); + aString.setValue(me.getDatasetName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.FEED_CONN_DATASET_NAME_FIELD_INDEX, fieldValue); + + // field: outputType + fieldValue.reset(); + aString.setValue(me.getOutputType()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.FEED_CONN_OUTPUT_TYPE_INDEX, fieldValue); + + // field: appliedFunctions + fieldValue.reset(); + writeAppliedFunctionsField(recordBuilder, me, fieldValue); + + // field: policyName + fieldValue.reset(); + aString.setValue(me.getPolicyName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, fieldValue); + + recordBuilder.write(tupleBuilder.getDataOutput(), true); + tupleBuilder.addFieldEndOffset(); + + tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + return tuple; + } + + private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection fc, ArrayBackedValueStorage buffer) + throws HyracksDataException { + UnorderedListBuilder listBuilder = new UnorderedListBuilder(); + ArrayBackedValueStorage listEleBuffer = new ArrayBackedValueStorage(); + + listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE + .getFieldTypes()[MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX]); + if (fc.getAppliedFunctions() != null) { + List<FunctionSignature> appliedFunctions = fc.getAppliedFunctions(); + for (FunctionSignature af : appliedFunctions) { + aString.setValue(af.getName()); + stringSerde.serialize(aString, listEleBuffer.getDataOutput()); + listBuilder.addItem(listEleBuffer); + } + } + listBuilder.write(buffer.getDataOutput(), true); + rb.addField(MetadataRecordTypes.FEED_CONN_APPLIED_FUNCTIONS_FIELD_INDEX, buffer); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fff200ca/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java index dc0b9c9..4503e09 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java @@ -29,19 +29,14 @@ import java.util.HashMap; import java.util.Map; import org.apache.asterix.builders.IARecordBuilder; -import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.external.feed.api.IFeed; -import org.apache.asterix.external.feed.api.IFeed.FeedType; +import org.apache.asterix.builders.UnorderedListBuilder; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; import org.apache.asterix.metadata.entities.Feed; -import org.apache.asterix.om.base.AMissing; import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.AUnorderedList; @@ -67,8 +62,8 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> { public static final int FEED_PAYLOAD_TUPLE_FIELD_INDEX = 2; @SuppressWarnings("unchecked") - private ISerializerDeserializer<ARecord> recordSerDes = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE); + private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE); protected FeedTupleTranslator(boolean getTuple) { super(getTuple, MetadataPrimaryIndexes.FEED_DATASET.getFieldCount()); @@ -86,65 +81,30 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> { } private Feed createFeedFromARecord(ARecord feedRecord) { - Feed feed = null; - String dataverseName = - ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX)) - .getStringValue(); + Feed feed; + String dataverseName = ((AString) feedRecord + .getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue(); String feedName = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX)) .getStringValue(); - Object o = feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX); - FunctionSignature signature = null; - if (!(o instanceof ANull) && !(o instanceof AMissing)) { - String functionName = ((AString) o).getStringValue(); - signature = new FunctionSignature(dataverseName, functionName, 1); + AUnorderedList feedConfig = (AUnorderedList) feedRecord + .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX); + String adapterName = ((AString) feedRecord + .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_NAME_INDEX)).getStringValue(); + + IACursor cursor = feedConfig.getCursor(); + + // restore configurations + String key; + String value; + Map<String, String> adaptorConfiguration = new HashMap<>(); + while (cursor.next()) { + ARecord field = (ARecord) cursor.get(); + key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue(); + value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue(); + adaptorConfiguration.put(key, value); } - - String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX)) - .getStringValue(); - - IFeed.FeedType feedTypeEnum = IFeed.FeedType.valueOf(feedType.toUpperCase()); - switch (feedTypeEnum) { - case PRIMARY: { - ARecord feedTypeDetailsRecord = (ARecord) feedRecord - .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX); - String adapterName = ((AString) feedTypeDetailsRecord - .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX)) - .getStringValue(); - - IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord.getValueByPos( - MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX)) - .getCursor(); - String key; - String value; - Map<String, String> adaptorConfiguration = new HashMap<String, String>(); - while (cursor.next()) { - ARecord field = (ARecord) cursor.get(); - key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)) - .getStringValue(); - value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)) - .getStringValue(); - adaptorConfiguration.put(key, value); - } - feed = new Feed(dataverseName, feedName, signature, FeedType.PRIMARY, feedName, adapterName, - adaptorConfiguration); - - } - break; - case SECONDARY: { - ARecord feedTypeDetailsRecord = (ARecord) feedRecord - .getValueByPos(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX); - - String sourceFeedName = ((AString) feedTypeDetailsRecord - .getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX)) - .getStringValue(); - - feed = new Feed(dataverseName, feedName, signature, FeedType.SECONDARY, sourceFeedName, null, null); - - } - break; - } - + feed = new Feed(dataverseName, feedName, adapterName, adaptorConfiguration); return feed; } @@ -162,37 +122,29 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> { recordBuilder.reset(MetadataRecordTypes.FEED_RECORDTYPE); - // write field 0 + // write dataverse name fieldValue.reset(); aString.setValue(feed.getDataverseName()); stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue); - // write field 1 + // write feed name fieldValue.reset(); aString.setValue(feed.getFeedName()); stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX, fieldValue); - // write field 2 + // adaptor name fieldValue.reset(); - if (feed.getAppliedFunction() != null) { - aString.setValue(feed.getAppliedFunction().getName()); - stringSerde.serialize(aString, fieldValue.getDataOutput()); - recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX, fieldValue); - } - - // write field 3 - fieldValue.reset(); - aString.setValue(feed.getFeedType().name().toUpperCase()); + aString.setValue(feed.getAdapterName()); stringSerde.serialize(aString, fieldValue.getDataOutput()); - recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX, fieldValue); + recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_NAME_INDEX, fieldValue); - // write field 4/5 + // write adaptor configuration fieldValue.reset(); - writeFeedTypeDetailsRecordType(recordBuilder, feed, fieldValue); + writeFeedAdaptorField(recordBuilder, feed, fieldValue); - // write field 6 + // write timestamp fieldValue.reset(); aString.setValue(Calendar.getInstance().getTime().toString()); stringSerde.serialize(aString, fieldValue.getDataOutput()); @@ -206,81 +158,32 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> { return tuple; } - @SuppressWarnings("unchecked") - private void writeFeedTypeDetailsRecordType(IARecordBuilder recordBuilder, Feed feed, - ArrayBackedValueStorage fieldValue) throws HyracksDataException { - - switch (feed.getFeedType()) { - case PRIMARY: { - - IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder(); - OrderedListBuilder listBuilder = new OrderedListBuilder(); - ArrayBackedValueStorage primaryRecordfieldValue = new ArrayBackedValueStorage(); - ArrayBackedValueStorage primaryRecordItemValue = new ArrayBackedValueStorage(); - primaryDetailsRecordBuilder.reset(MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE); - - AMutableString aString = new AMutableString(""); - ISerializerDeserializer<AString> stringSerde = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); - - // write field 0 - fieldValue.reset(); - aString.setValue(feed.getAdapterName()); - stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput()); - primaryDetailsRecordBuilder.addField( - MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX, - primaryRecordfieldValue); - - // write field 1 - listBuilder.reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE - .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]); - for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) { - String name = property.getKey(); - String value = property.getValue(); - primaryRecordItemValue.reset(); - writePropertyTypeRecord(name, value, primaryRecordItemValue.getDataOutput()); - listBuilder.addItem(primaryRecordItemValue); - } - primaryRecordfieldValue.reset(); - listBuilder.write(primaryRecordfieldValue.getDataOutput(), true); - primaryDetailsRecordBuilder.addField( - MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX, - primaryRecordfieldValue); - - primaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true); - - recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX, fieldValue); - } - break; - - case SECONDARY: - IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder(); - ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage(); - secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE); - - // write field 0 - fieldValue.reset(); - aString.setValue(feed.getSourceFeedName()); - stringSerde.serialize(aString, secondaryFieldValue.getDataOutput()); - secondaryDetailsRecordBuilder.addField( - MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX, - secondaryFieldValue); - - secondaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true); - recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX, fieldValue); - break; + private void writeFeedAdaptorField(IARecordBuilder recordBuilder, Feed feed, + ArrayBackedValueStorage fieldValueBuffer) throws HyracksDataException { + UnorderedListBuilder listBuilder = new UnorderedListBuilder(); + ArrayBackedValueStorage listEleBuffer = new ArrayBackedValueStorage(); + + listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_RECORDTYPE + .getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX]); + for (Map.Entry<String, String> property : feed.getAdapterConfiguration().entrySet()) { + String name = property.getKey(); + String value = property.getValue(); + listEleBuffer.reset(); + writePropertyTypeRecord(name, value, listEleBuffer.getDataOutput()); + listBuilder.addItem(listEleBuffer); } - + listBuilder.write(fieldValueBuffer.getDataOutput(), true); + recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTOR_CONFIG_INDEX, fieldValueBuffer); } @SuppressWarnings("unchecked") public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException { IARecordBuilder propertyRecordBuilder = new RecordBuilder(); ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); - propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ADAPTER_CONFIGURATION_RECORDTYPE); + propertyRecordBuilder.reset(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE); AMutableString aString = new AMutableString(""); - ISerializerDeserializer<AString> stringSerde = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING); // write field 0 fieldValue.reset();
