http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java index 72285d0..406b3d6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java @@ -22,7 +22,7 @@ package org.apache.asterix.metadata; import java.util.ArrayList; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.metadata.entities.CompactionPolicy; import org.apache.asterix.metadata.entities.Dataset; @@ -68,14 +68,14 @@ public class MetadataTransactionContext extends MetadataCache { protected MetadataCache droppedCache = new MetadataCache(); protected ArrayList<MetadataLogicalOperation> opLog = new ArrayList<>(); - private final JobId jobId; + private final TxnId txnId; - public MetadataTransactionContext(JobId jobId) { - this.jobId = jobId; + public MetadataTransactionContext(TxnId txnId) { + this.txnId = txnId; } - public JobId getJobId() { - return jobId; + public TxnId getTxnId() { + return txnId; } public void addDataverse(Dataverse dataverse) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java index 8c4920f..cdb27d7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.entities.CompactionPolicy; import org.apache.asterix.metadata.entities.Dataset; @@ -59,7 +59,7 @@ public interface IMetadataNode extends Remote, Serializable { * @throws ACIDException * @throws RemoteException */ - void beginTransaction(JobId jobId) throws ACIDException, RemoteException; + void beginTransaction(TxnId txnId) throws ACIDException, RemoteException; /** * Commits a local transaction against the metadata. @@ -67,7 +67,7 @@ public interface IMetadataNode extends Remote, Serializable { * @throws ACIDException * @throws RemoteException */ - void commitTransaction(JobId jobId) throws ACIDException, RemoteException; + void commitTransaction(TxnId txnId) throws ACIDException, RemoteException; /** * Aborts a local transaction against the metadata. @@ -75,7 +75,7 @@ public interface IMetadataNode extends Remote, Serializable { * @throws ACIDException * @throws RemoteException */ - void abortTransaction(JobId jobId) throws ACIDException, RemoteException; + void abortTransaction(TxnId txnId) throws ACIDException, RemoteException; /** * Locally locks the entire metadata in given mode on behalf of given @@ -84,7 +84,7 @@ public interface IMetadataNode extends Remote, Serializable { * @throws ACIDException * @throws RemoteException */ - void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException; + void lock(TxnId txnId, byte lockMode) throws ACIDException, RemoteException; /** * Releases all local locks of given transaction id. @@ -92,13 +92,13 @@ public interface IMetadataNode extends Remote, Serializable { * @throws ACIDException * @throws RemoteException */ - void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException; + void unlock(TxnId txnId, byte lockMode) throws ACIDException, RemoteException; /** * Inserts a new dataverse into the metadata, acquiring local locks on behalf of * the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverse * Dataverse instance to be inserted. @@ -106,26 +106,26 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataverse already exists. * @throws RemoteException */ - void addDataverse(JobId jobId, Dataverse dataverse) throws AlgebricksException, RemoteException; + void addDataverse(TxnId txnId, Dataverse dataverse) throws AlgebricksException, RemoteException; /** * Retrieves all dataverses, acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @return A list of dataverse instances. * @throws AlgebricksException * For example, if the dataverse does not exist. * @throws RemoteException */ - List<Dataverse> getDataverses(JobId jobId) throws AlgebricksException, RemoteException; + List<Dataverse> getDataverses(TxnId txnId) throws AlgebricksException, RemoteException; /** * Retrieves a dataverse with given name, acquiring local locks on behalf of the * given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of the dataverse to retrieve. @@ -134,13 +134,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataverse does not exist. * @throws RemoteException */ - Dataverse getDataverse(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + Dataverse getDataverse(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** * Retrieves all datasets belonging to the given dataverse, acquiring local * locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of the dataverse of which to find all datasets. @@ -148,27 +148,27 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * For example, if the dataverse does not exist. RemoteException */ - List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + List<Dataset> getDataverseDatasets(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** * Deletes the dataverse with given name, and all it's associated datasets, * indexes, and types, acquiring local locks on behalf of the given transaction * id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @return A list of dataset instances. * @throws AlgebricksException * For example, if the dataverse does not exist. * @throws RemoteException */ - void dropDataverse(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + void dropDataverse(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** * Inserts a new dataset into the metadata, acquiring local locks on behalf of * the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataset * Dataset instance to be inserted. @@ -176,13 +176,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataset already exists. * @throws RemoteException */ - void addDataset(JobId jobId, Dataset dataset) throws AlgebricksException, RemoteException; + void addDataset(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException; /** * Retrieves a dataset within a given dataverse, acquiring local locks on behalf * of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Dataverse name to look for the dataset. @@ -193,14 +193,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataset does not exist. * @throws RemoteException */ - Dataset getDataset(JobId jobId, String dataverseName, String datasetName) + Dataset getDataset(TxnId txnId, String dataverseName, String datasetName) throws AlgebricksException, RemoteException; /** * Retrieves all indexes of a dataset, acquiring local locks on behalf of the * given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of dataverse which holds the given dataset. @@ -211,14 +211,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataset and/or dataverse does not exist. * @throws RemoteException */ - List<Index> getDatasetIndexes(JobId jobId, String dataverseName, String datasetName) + List<Index> getDatasetIndexes(TxnId txnId, String dataverseName, String datasetName) throws AlgebricksException, RemoteException; /** * Deletes the dataset with given name, and all it's associated indexes, * acquiring local locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of dataverse which holds the given dataset. @@ -228,14 +228,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataset and/or dataverse does not exist. * @throws RemoteException */ - void dropDataset(JobId jobId, String dataverseName, String datasetName) throws AlgebricksException, RemoteException; + void dropDataset(TxnId txnId, String dataverseName, String datasetName) throws AlgebricksException, RemoteException; /** * Inserts an index into the metadata, acquiring local locks on behalf of the * given transaction id. The index itself knows its name, and which dataset it * belongs to. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param index * Index instance to be inserted. @@ -243,13 +243,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the index already exists. * @throws RemoteException */ - void addIndex(JobId jobId, Index index) throws AlgebricksException, RemoteException; + void addIndex(TxnId txnId, Index index) throws AlgebricksException, RemoteException; /** * Retrieves the index with given name, in given dataverse and dataset, * acquiring local locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of the datavers holding the given dataset. @@ -261,14 +261,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the index does not exist. * @throws RemoteException */ - Index getIndex(JobId jobId, String dataverseName, String datasetName, String indexName) + Index getIndex(TxnId txnId, String dataverseName, String datasetName, String indexName) throws AlgebricksException, RemoteException; /** * Deletes the index with given name, in given dataverse and dataset, acquiring * local locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of the datavers holding the given dataset. @@ -279,14 +279,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the index does not exist. * @throws RemoteException */ - void dropIndex(JobId jobId, String dataverseName, String datasetName, String indexName) + void dropIndex(TxnId txnId, String dataverseName, String datasetName, String indexName) throws AlgebricksException, RemoteException; /** * Inserts a datatype, acquiring local locks on behalf of the given transaction * id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param datatype * Datatype instance to be inserted. @@ -294,13 +294,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the datatype already exists. * @throws RemoteException */ - void addDatatype(JobId jobId, Datatype datatype) throws AlgebricksException, RemoteException; + void addDatatype(TxnId txnId, Datatype datatype) throws AlgebricksException, RemoteException; /** * Retrieves the datatype with given name in given dataverse, acquiring local * locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of dataverse holding the datatype. @@ -311,14 +311,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the datatype does not exist. * @throws RemoteException */ - Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName) + Datatype getDatatype(TxnId txnId, String dataverseName, String datatypeName) throws AlgebricksException, RemoteException; /** * Deletes the given datatype in given dataverse, acquiring local locks on * behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of dataverse holding the datatype. @@ -329,14 +329,14 @@ public interface IMetadataNode extends Remote, Serializable { * deleted. * @throws RemoteException */ - void dropDatatype(JobId jobId, String dataverseName, String datatypeName) + void dropDatatype(TxnId txnId, String dataverseName, String datatypeName) throws AlgebricksException, RemoteException; /** * Inserts a node group, acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param nodeGroup * Node group instance to insert. @@ -344,13 +344,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the node group already exists. * @throws RemoteException */ - void addNodeGroup(JobId jobId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException; + void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException; /** * Retrieves a node group, acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param nodeGroupName * Name of node group to be retrieved. @@ -358,13 +358,13 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the node group does not exist. * @throws RemoteException */ - NodeGroup getNodeGroup(JobId jobId, String nodeGroupName) throws AlgebricksException, RemoteException; + NodeGroup getNodeGroup(TxnId txnId, String nodeGroupName) throws AlgebricksException, RemoteException; /** * Deletes a node group, acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param nodeGroupName * Name of node group to be deleted. @@ -377,14 +377,14 @@ public interface IMetadataNode extends Remote, Serializable { * group to be deleted. * @throws RemoteException */ - boolean dropNodegroup(JobId jobId, String nodeGroupName, boolean failSilently) + boolean dropNodegroup(TxnId txnId, String nodeGroupName, boolean failSilently) throws AlgebricksException, RemoteException; /** * Inserts a node (compute node), acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param node * Node instance to be inserted. @@ -392,10 +392,10 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the node already exists. * @throws RemoteException */ - void addNode(JobId jobId, Node node) throws AlgebricksException, RemoteException; + void addNode(TxnId txnId, Node node) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param functionSignature * An instance of functionSignature representing the function @@ -403,15 +403,15 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - Function getFunction(JobId jobId, FunctionSignature functionSignature) throws AlgebricksException, RemoteException; + Function getFunction(TxnId txnId, FunctionSignature functionSignature) throws AlgebricksException, RemoteException; - List<Function> getFunctions(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + List<Function> getFunctions(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** * Deletes a function, acquiring local locks on behalf of the given transaction * id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param functionSignature * An instance of functionSignature representing the function @@ -420,10 +420,10 @@ public interface IMetadataNode extends Remote, Serializable { * group to be deleted. * @throws RemoteException */ - void dropFunction(JobId jobId, FunctionSignature functionSignature) throws AlgebricksException, RemoteException; + void dropFunction(TxnId txnId, FunctionSignature functionSignature) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param function * Function to be inserted @@ -432,45 +432,45 @@ public interface IMetadataNode extends Remote, Serializable { * unknown function * @throws RemoteException */ - void addFunction(JobId jobId, Function function) throws AlgebricksException, RemoteException; + void addFunction(TxnId txnId, Function function) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverseName * @return List<Function> A list containing the functions in the specified * dataverse * @throws AlgebricksException * @throws RemoteException */ - List<Function> getDataverseFunctions(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + List<Function> getDataverseFunctions(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverseName * @return List<Adapter> A list containing the adapters in the specified * dataverse * @throws AlgebricksException * @throws RemoteException */ - List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) + List<DatasourceAdapter> getDataverseAdapters(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverseName * @param adapterName * @return * @throws AlgebricksException * @throws RemoteException */ - DatasourceAdapter getAdapter(JobId jobId, String dataverseName, String adapterName) + DatasourceAdapter getAdapter(TxnId txnId, String dataverseName, String adapterName) throws AlgebricksException, RemoteException; /** * Deletes a adapter , acquiring local locks on behalf of the given transaction * id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * dataverse asociated with the adapter that is to be deleted. @@ -480,10 +480,10 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - void dropAdapter(JobId jobId, String dataverseName, String adapterName) throws AlgebricksException, RemoteException; + void dropAdapter(TxnId txnId, String dataverseName, String adapterName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param adapter * Adapter to be inserted @@ -491,34 +491,34 @@ public interface IMetadataNode extends Remote, Serializable { * for example, if the adapter already exists. * @throws RemoteException */ - void addAdapter(JobId jobId, DatasourceAdapter adapter) throws AlgebricksException, RemoteException; + void addAdapter(TxnId txnId, DatasourceAdapter adapter) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param compactionPolicy * @throws AlgebricksException * @throws RemoteException */ - void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) + void addCompactionPolicy(TxnId txnId, CompactionPolicy compactionPolicy) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverse * @param policy * @return * @throws AlgebricksException * @throws RemoteException */ - CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policy) + CompactionPolicy getCompactionPolicy(TxnId txnId, String dataverse, String policy) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @throws AlgebricksException * @throws RemoteException */ - void initializeDatasetIdFactory(JobId jobId) throws AlgebricksException, RemoteException; + void initializeDatasetIdFactory(TxnId txnId) throws AlgebricksException, RemoteException; /** * @return @@ -528,58 +528,58 @@ public interface IMetadataNode extends Remote, Serializable { int getMostRecentDatasetId() throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param feed * @throws AlgebricksException * @throws RemoteException */ - void addFeed(JobId jobId, Feed feed) throws AlgebricksException, RemoteException; + void addFeed(TxnId txnId, Feed feed) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverse * @param feedName * @return * @throws AlgebricksException * @throws RemoteException */ - Feed getFeed(JobId jobId, String dataverse, String feedName) throws AlgebricksException, RemoteException; + Feed getFeed(TxnId txnId, String dataverse, String feedName) throws AlgebricksException, RemoteException; - List<Feed> getFeeds(JobId jobId, String dataverse) throws AlgebricksException, RemoteException; + List<Feed> getFeeds(TxnId txnId, String dataverse) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverse * @param feedName * @throws AlgebricksException * @throws RemoteException */ - void dropFeed(JobId jobId, String dataverse, String feedName) throws AlgebricksException, RemoteException; + void dropFeed(TxnId txnId, String dataverse, String feedName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param feedPolicy * @throws AlgebricksException * @throws RemoteException */ - void addFeedPolicy(JobId jobId, FeedPolicyEntity feedPolicy) throws AlgebricksException, RemoteException; + void addFeedPolicy(TxnId txnId, FeedPolicyEntity feedPolicy) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverse * @param policy * @return * @throws AlgebricksException * @throws RemoteException */ - FeedPolicyEntity getFeedPolicy(JobId jobId, String dataverse, String policy) + FeedPolicyEntity getFeedPolicy(TxnId txnId, String dataverse, String policy) throws AlgebricksException, RemoteException; /** * Removes a library , acquiring local locks on behalf of the given transaction * id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * dataverse asociated with the adapter that is to be deleted. @@ -589,12 +589,12 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - void dropLibrary(JobId jobId, String dataverseName, String libraryName) throws AlgebricksException, RemoteException; + void dropLibrary(TxnId txnId, String dataverseName, String libraryName) throws AlgebricksException, RemoteException; /** * Adds a library, acquiring local locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param library * Library to be added @@ -602,10 +602,10 @@ public interface IMetadataNode extends Remote, Serializable { * for example, if the library is already added. * @throws RemoteException */ - void addLibrary(JobId jobId, Library library) throws AlgebricksException, RemoteException; + void addLibrary(TxnId txnId, Library library) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * dataverse asociated with the library that is to be retrieved. @@ -615,7 +615,7 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - Library getLibrary(JobId jobId, String dataverseName, String libraryName) + Library getLibrary(TxnId txnId, String dataverseName, String libraryName) throws AlgebricksException, RemoteException; /** @@ -629,42 +629,42 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - List<Library> getDataverseLibraries(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + List<Library> getDataverseLibraries(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverseName * @return * @throws AlgebricksException * @throws RemoteException */ - List<Feed> getDataverseFeeds(JobId jobId, String dataverseName) throws AlgebricksException, RemoteException; + List<Feed> getDataverseFeeds(TxnId txnId, String dataverseName) throws AlgebricksException, RemoteException; /** * delete a give feed (ingestion) policy * - * @param jobId + * @param txnId * @param dataverseName * @param policyName * @return * @throws RemoteException * @throws AlgebricksException */ - void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) + void dropFeedPolicy(TxnId txnId, String dataverseName, String policyName) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * @param dataverse * @return * @throws AlgebricksException * @throws RemoteException */ - List<FeedPolicyEntity> getDataversePolicies(JobId jobId, String dataverse) + List<FeedPolicyEntity> getDataversePolicies(TxnId txnId, String dataverse) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param externalFile * An object representing the external file entity @@ -672,23 +672,23 @@ public interface IMetadataNode extends Remote, Serializable { * for example, if the file already exists. * @throws RemoteException */ - void addExternalFile(JobId jobId, ExternalFile externalFile) throws AlgebricksException, RemoteException; + void addExternalFile(TxnId txnId, ExternalFile externalFile) throws AlgebricksException, RemoteException; /** - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataset * A dataset the files belongs to. * @throws AlgebricksException * @throws RemoteException */ - List<ExternalFile> getExternalFiles(JobId jobId, Dataset dataset) throws AlgebricksException, RemoteException; + List<ExternalFile> getExternalFiles(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException; /** * Deletes an externalFile , acquiring local locks on behalf of the given * transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * dataverse asociated with the external dataset that owns the file @@ -700,27 +700,27 @@ public interface IMetadataNode extends Remote, Serializable { * @throws AlgebricksException * @throws RemoteException */ - void dropExternalFile(JobId jobId, String dataverseName, String datasetName, int fileNumber) + void dropExternalFile(TxnId txnId, String dataverseName, String datasetName, int fileNumber) throws AlgebricksException, RemoteException; /** * Deletes all external files belonging to a dataset, acquiring local locks on * behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataset * An external dataset the files belong to. * @throws AlgebricksException * @throws RemoteException */ - void dropExternalFiles(JobId jobId, Dataset dataset) throws AlgebricksException, RemoteException; + void dropExternalFiles(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException; /** * Retrieves the file with given number, in given dataverse and dataset, * acquiring local locks on behalf of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataverseName * Name of the datavers holding the given dataset. @@ -733,14 +733,14 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the index does not exist. * @throws RemoteException */ - ExternalFile getExternalFile(JobId jobId, String dataverseName, String datasetName, Integer fileNumber) + ExternalFile getExternalFile(TxnId txnId, String dataverseName, String datasetName, Integer fileNumber) throws AlgebricksException, RemoteException; /** * update an existing dataset in the metadata, acquiring local locks on behalf * of the given transaction id. * - * @param jobId + * @param txnId * A globally unique id for an active metadata transaction. * @param dataset * updated Dataset instance. @@ -748,63 +748,63 @@ public interface IMetadataNode extends Remote, Serializable { * For example, if the dataset already exists. * @throws RemoteException */ - void updateDataset(JobId jobId, Dataset dataset) throws AlgebricksException, RemoteException; + void updateDataset(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException; /** * Adds an extension entity under the ongoing transaction job id * - * @param jobId + * @param txnId * @param entity * @throws AlgebricksException * @throws RemoteException */ - <T extends IExtensionMetadataEntity> void addEntity(JobId jobId, T entity) + <T extends IExtensionMetadataEntity> void addEntity(TxnId txnId, T entity) throws AlgebricksException, RemoteException; /** * Upserts an extension entity under the ongoing transaction job id * - * @param jobId + * @param txnId * @param entity * @throws AlgebricksException * @throws RemoteException */ - <T extends IExtensionMetadataEntity> void upsertEntity(JobId jobId, T entity) + <T extends IExtensionMetadataEntity> void upsertEntity(TxnId txnId, T entity) throws AlgebricksException, RemoteException; /** * Deletes an extension entity under the ongoing transaction job id * - * @param jobId + * @param txnId * @param entity * @throws AlgebricksException * @throws RemoteException */ - <T extends IExtensionMetadataEntity> void deleteEntity(JobId jobId, T entity) + <T extends IExtensionMetadataEntity> void deleteEntity(TxnId txnId, T entity) throws AlgebricksException, RemoteException; /** * Gets a list of extension entities matching a search key under the ongoing * transaction * - * @param jobId + * @param txnId * @param searchKey * @return * @throws AlgebricksException * @throws RemoteException */ - <T extends IExtensionMetadataEntity> List<T> getEntities(JobId jobId, IExtensionMetadataSearchKey searchKey) + <T extends IExtensionMetadataEntity> List<T> getEntities(TxnId txnId, IExtensionMetadataSearchKey searchKey) throws AlgebricksException, RemoteException; - void addFeedConnection(JobId jobId, FeedConnection feedConnection) throws AlgebricksException, RemoteException; + void addFeedConnection(TxnId txnId, FeedConnection feedConnection) throws AlgebricksException, RemoteException; - FeedConnection getFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + FeedConnection getFeedConnection(TxnId txnId, String dataverseName, String feedName, String datasetName) throws AlgebricksException, RemoteException; - void dropFeedConnection(JobId jobId, String dataverseName, String feedName, String datasetName) + void dropFeedConnection(TxnId txnId, String dataverseName, String feedName, String datasetName) throws AlgebricksException, RemoteException; - List<FeedConnection> getFeedConnections(JobId jobId, String dataverseName, String feedName) + List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, String feedName) throws AlgebricksException, RemoteException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IValueExtractor.java index 7d19b20..4cc7719 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IValueExtractor.java @@ -22,7 +22,7 @@ package org.apache.asterix.metadata.api; import java.io.IOException; import java.rmi.RemoteException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -38,7 +38,7 @@ public interface IValueExtractor<T> { /** * Extracts an object of type T from a given tuple. * - * @param jobId + * @param txnId * A globally unique transaction id. * @param tuple * Tuple from which an object shall be extracted. @@ -47,5 +47,5 @@ public interface IValueExtractor<T> { * @throws HyracksDataException * @throws IOException */ - T getValue(JobId jobId, ITupleReference tuple) throws AlgebricksException, HyracksDataException, RemoteException; + T getValue(TxnId txnId, ITupleReference tuple) throws AlgebricksException, HyracksDataException, RemoteException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 463c96b..71ed913 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -37,7 +37,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.LockList; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; @@ -156,7 +156,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private boolean asyncResults; private ResultSetId resultSetId; private IResultSerializerFactoryProvider resultSerializerFactoryProvider; - private JobId jobId; + private TxnId txnId; private Map<String, Integer> externalDataLocks; private boolean isTemporaryDatasetWriteJob = true; private boolean blockingOperatorDisabled = false; @@ -188,8 +188,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return config; } - public void setJobId(JobId jobId) { - this.jobId = jobId; + public void setTxnId(TxnId txnId) { + this.txnId = txnId; } public Dataverse getDefaultDataverse() { @@ -452,7 +452,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, theIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); + storageComponentProvider, theIndex, txnId, IndexOperation.SEARCH, primaryKeyFields); IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); BTreeSearchOperatorDescriptor btreeSearchOp; @@ -491,7 +491,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, secondaryIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); + storageComponentProvider, secondaryIndex, txnId, IndexOperation.SEARCH, primaryKeyFields); RTreeSearchOperatorDescriptor rtreeSearchOp; IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); @@ -735,8 +735,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } } - public JobId getJobId() { - return jobId; + public TxnId getTxnId() { + return txnId; } public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields) @@ -797,7 +797,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> // files index RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); ISearchOperationCallbackFactory searchOpCallbackFactory = dataset - .getSearchCallbackFactory(storageComponentProvider, fileIndex, jobId, IndexOperation.SEARCH, null); + .getSearchCallbackFactory(storageComponentProvider, fileIndex, txnId, IndexOperation.SEARCH, null); // Create the operator ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory, @@ -972,7 +972,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> primaryKeyFields[i] = i; } IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields); + storageComponentProvider, primaryIndex, txnId, indexOp, primaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1097,9 +1097,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1198,9 +1198,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - JobId planJobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + TxnId planTxnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, planJobId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, planTxnId, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1311,9 +1311,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/BuiltinTypeMap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/BuiltinTypeMap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/BuiltinTypeMap.java index 22732d3..a531add 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/BuiltinTypeMap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/BuiltinTypeMap.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.asterix.common.exceptions.MetadataException; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.BuiltinType; @@ -93,12 +93,12 @@ public class BuiltinTypeMap { return new HashSet<>(_builtinTypeMap.values()); } - public static IAType getTypeFromTypeName(MetadataNode metadataNode, JobId jobId, String dataverseName, + public static IAType getTypeFromTypeName(MetadataNode metadataNode, TxnId txnId, String dataverseName, String typeName, boolean optional) throws AlgebricksException { IAType type = _builtinTypeMap.get(typeName); if (type == null) { try { - Datatype dt = metadataNode.getDatatype(jobId, dataverseName, typeName); + Datatype dt = metadataNode.getDatatype(txnId, dataverseName, typeName); type = dt.getDatatype(); } catch (RemoteException e) { throw new MetadataException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index e5f97f0..48a6e6a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -42,7 +42,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallba import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.external.feed.management.FeedConnectionId; @@ -543,7 +543,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * * @param index * the index - * @param jobId + * @param txnId * the job id being compiled * @param op * the operation this search is part of @@ -555,7 +555,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * if the callback factory could not be created */ public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider, - Index index, JobId jobId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { + Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { if (getDatasetDetails().isTemp()) { return NoOpOperationCallbackFactory.INSTANCE; } else if (index.isPrimaryIndex()) { @@ -564,14 +564,14 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * we may acquire very short duration lock(i.e., instant lock) for readers. */ return (op == IndexOperation.UPSERT) - ? new LockThenSearchOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + ? new LockThenSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + : new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE); } else if (index.getKeyFieldNames().isEmpty()) { // this is the case where the index is secondary primary index and locking is required // since the secondary primary index replaces the dataset index (which locks) - return new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + return new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE); } return new SecondaryIndexSearchOperationCallbackFactory(); @@ -582,7 +582,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * * @param index * the index - * @param jobId + * @param txnId * the job id of the job being compiled * @param op * the operation performed for this callback @@ -594,31 +594,31 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * If the callback factory could not be created */ public IModificationOperationCallbackFactory getModificationCallbackFactory( - IStorageComponentProvider componentProvider, Index index, JobId jobId, IndexOperation op, + IStorageComponentProvider componentProvider, Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { if (getDatasetDetails().isTemp()) { return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ? index.isPrimaryIndex() - ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, + ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(txnId, datasetId, primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType()) - : new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), + : new TempDatasetSecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; } else if (index.isPrimaryIndex()) { return op == IndexOperation.UPSERT - ? new UpsertOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + ? new UpsertOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType()) : op == IndexOperation.DELETE || op == IndexOperation.INSERT - ? new PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), + ? new PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; } else { return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT - ? new SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, + ? new SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; @@ -665,7 +665,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * * @param metadataProvider, * the metadata provider. - * @param jobId, + * @param txnId, * the AsterixDB job id for transaction management. * @param primaryKeyFieldPermutation, * the primary key field permutation according to the input. @@ -674,10 +674,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset. * @throws AlgebricksException */ - public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, JobId jobId, + public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, TxnId txnId, int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException { int[] datasetPartitions = getDatasetPartitions(metadataProvider); - return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFieldPermutation, + return new CommitRuntimeFactory(txnId, datasetId, primaryKeyFieldPermutation, metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions, isSink); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java index f75ea8f..6d7e25f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatatypeTupleTranslator.java @@ -29,7 +29,7 @@ import java.util.Calendar; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; @@ -79,11 +79,11 @@ public class DatatypeTupleTranslator extends AbstractTupleTranslator<Datatype> { private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.DATATYPE_RECORDTYPE); private final MetadataNode metadataNode; - private final JobId jobId; + private final TxnId txnId; - protected DatatypeTupleTranslator(JobId jobId, MetadataNode metadataNode, boolean getTuple) { + protected DatatypeTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) { super(getTuple, MetadataPrimaryIndexes.DATATYPE_DATASET.getFieldCount()); - this.jobId = jobId; + this.txnId = txnId; this.metadataNode = metadataNode; } @@ -142,7 +142,7 @@ public class DatatypeTupleTranslator extends AbstractTupleTranslator<Datatype> { boolean isNullable = ((ABoolean) field .getValueByPos(MetadataRecordTypes.FIELD_ARECORD_ISNULLABLE_FIELD_INDEX)).getBoolean() .booleanValue(); - fieldTypes[fieldId] = BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, dataverseName, + fieldTypes[fieldId] = BuiltinTypeMap.getTypeFromTypeName(metadataNode, txnId, dataverseName, fieldTypeName, isNullable); fieldId++; } @@ -154,7 +154,7 @@ public class DatatypeTupleTranslator extends AbstractTupleTranslator<Datatype> { .getValueByPos(MetadataRecordTypes.DERIVEDTYPE_ARECORD_UNORDEREDLIST_FIELD_INDEX)) .getStringValue(); return new Datatype(dataverseName, datatypeName, - new AUnorderedListType(BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, + new AUnorderedListType(BuiltinTypeMap.getTypeFromTypeName(metadataNode, txnId, dataverseName, unorderedlistTypeName, false), datatypeName), isAnonymous); } @@ -163,7 +163,7 @@ public class DatatypeTupleTranslator extends AbstractTupleTranslator<Datatype> { .getValueByPos(MetadataRecordTypes.DERIVEDTYPE_ARECORD_ORDEREDLIST_FIELD_INDEX)) .getStringValue(); return new Datatype(dataverseName, datatypeName, - new AOrderedListType(BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, dataverseName, + new AOrderedListType(BuiltinTypeMap.getTypeFromTypeName(metadataNode, txnId, dataverseName, orderedlistTypeName, false), datatypeName), isAnonymous); } @@ -365,7 +365,7 @@ public class DatatypeTupleTranslator extends AbstractTupleTranslator<Datatype> { private String handleNestedDerivedType(String typeName, AbstractComplexType nestedType, Datatype topLevelType, String dataverseName, String datatypeName) throws HyracksDataException { try { - metadataNode.addDatatype(jobId, new Datatype(dataverseName, typeName, nestedType, true)); + metadataNode.addDatatype(txnId, new Datatype(dataverseName, typeName, nestedType, true)); } catch (AlgebricksException e) { // The nested record type may have been inserted by a previous DDL statement or // by http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java index 8989cdc..a154d7f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java @@ -29,7 +29,7 @@ import java.util.List; import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; @@ -94,11 +94,11 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { private ISerializerDeserializer<ARecord> recordSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.INDEX_RECORDTYPE); private final MetadataNode metadataNode; - private final JobId jobId; + private final TxnId txnId; - protected IndexTupleTranslator(JobId jobId, MetadataNode metadataNode, boolean getTuple) { + protected IndexTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) { super(getTuple, MetadataPrimaryIndexes.INDEX_DATASET.getFieldCount()); - this.jobId = jobId; + this.txnId = txnId; this.metadataNode = metadataNode; } @@ -141,7 +141,7 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { List<IAType> searchKeyType = new ArrayList<>(searchKey.size()); while (fieldTypeCursor.next()) { String typeName = ((AString) fieldTypeCursor.get()).getStringValue(); - IAType fieldType = BuiltinTypeMap.getTypeFromTypeName(metadataNode, jobId, dvName, typeName, false); + IAType fieldType = BuiltinTypeMap.getTypeFromTypeName(metadataNode, txnId, dvName, typeName, false); searchKeyType.add(fieldType); } boolean isOverridingKeyTypes = !searchKeyType.isEmpty(); @@ -180,16 +180,16 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { // from the record metadata if (searchKeyType.isEmpty()) { try { - Dataset dSet = metadataNode.getDataset(jobId, dvName, dsName); + Dataset dSet = metadataNode.getDataset(txnId, dvName, dsName); String datatypeName = dSet.getItemTypeName(); String datatypeDataverseName = dSet.getItemTypeDataverseName(); ARecordType recordDt = (ARecordType) metadataNode - .getDatatype(jobId, datatypeDataverseName, datatypeName).getDatatype(); + .getDatatype(txnId, datatypeDataverseName, datatypeName).getDatatype(); String metatypeName = dSet.getMetaItemTypeName(); String metatypeDataverseName = dSet.getMetaItemTypeDataverseName(); ARecordType metaDt = null; if (metatypeName != null && metatypeDataverseName != null) { - metaDt = (ARecordType) metadataNode.getDatatype(jobId, metatypeDataverseName, metatypeName) + metaDt = (ARecordType) metadataNode.getDatatype(txnId, metatypeDataverseName, metatypeName) .getDatatype(); } searchKeyType = KeyFieldTypeUtil.getKeyTypes(recordDt, metaDt, searchKey, keyFieldSourceIndicator); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/MetadataTupleTranslatorProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/MetadataTupleTranslatorProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/MetadataTupleTranslatorProvider.java index 20f04c4..0625fc4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/MetadataTupleTranslatorProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/MetadataTupleTranslatorProvider.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.metadata.entitytupletranslators; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.MetadataNode; public class MetadataTupleTranslatorProvider { @@ -35,9 +35,9 @@ public class MetadataTupleTranslatorProvider { return new DatasourceAdapterTupleTranslator(getTuple); } - public DatatypeTupleTranslator getDataTypeTupleTranslator(JobId jobId, MetadataNode metadataNode, + public DatatypeTupleTranslator getDataTypeTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) { - return new DatatypeTupleTranslator(jobId, metadataNode, getTuple); + return new DatatypeTupleTranslator(txnId, metadataNode, getTuple); } public DataverseTupleTranslator getDataverseTupleTranslator(boolean getTuple) { @@ -60,8 +60,8 @@ public class MetadataTupleTranslatorProvider { return new FunctionTupleTranslator(getTuple); } - public IndexTupleTranslator getIndexTupleTranslator(JobId jobId, MetadataNode metadataNode, boolean getTuple) { - return new IndexTupleTranslator(jobId, metadataNode, getTuple); + public IndexTupleTranslator getIndexTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) { + return new IndexTupleTranslator(txnId, metadataNode, getTuple); } public LibraryTupleTranslator getLibraryTupleTranslator(boolean getTuple) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 2c457a9..07d3c69 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -38,7 +38,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -337,13 +337,13 @@ public class DatasetUtil { * the metadata provider. * @param dataset, * the dataset to scan. - * @param jobId, + * @param txnId, * the AsterixDB job id for transaction management. * @return a primary index scan operator. * @throws AlgebricksException */ public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider, - Dataset dataset, JobId jobId) throws AlgebricksException { + Dataset dataset, TxnId txnId) throws AlgebricksException { Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider.getSplitProviderAndConstraints(dataset); IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first; @@ -355,7 +355,7 @@ public class DatasetUtil { ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; boolean temp = dataset.getDatasetDetails().isTemp(); ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), + : new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(), dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider, IRecoveryManager.ResourceType.LSM_BTREE); IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( @@ -399,7 +399,7 @@ public class DatasetUtil { metadataProvider.getSplitProviderAndConstraints(dataset); // prepare callback - JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); + TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); int[] primaryKeyFields = new int[numKeys]; for (int i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; @@ -408,9 +408,9 @@ public class DatasetUtil { metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); + storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields); ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); + storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); LSMPrimaryUpsertOperatorDescriptor op; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index cc6923e..e6a24e3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -28,20 +28,19 @@ import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; public class IndexUtil { @@ -162,13 +161,13 @@ public class IndexUtil { * the metadata provider. * @return the AsterixDB job id for transaction management. */ - public static JobId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { - JobId jobId = JobIdFactory.generateJobId(); - metadataProvider.setJobId(jobId); + public static TxnId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { + TxnId txnId = TxnIdFactory.create(); + metadataProvider.setTxnId(txnId); boolean isWriteTransaction = metadataProvider.isWriteTransaction(); - IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); + IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction); spec.setJobletEventListenerFactory(jobEventListenerFactory); - return jobId; + return txnId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java index 78d9c19..8f70f21 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -129,11 +129,11 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, - jobId); + txnId); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java index 8ef5f34..89bd4b1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java @@ -21,7 +21,7 @@ package org.apache.asterix.metadata.utils; import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; @@ -72,14 +72,14 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends SecondaryCorrelate // only handle internal datasets // Create dummy key provider for feeding the primary index scan. - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java index 262b259..93cc11d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java @@ -21,7 +21,7 @@ package org.apache.asterix.metadata.utils; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -206,14 +206,14 @@ public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); IOperatorDescriptor sourceOp = primaryScanOp; boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java index 9106193..1333493 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; @@ -184,11 +184,11 @@ public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelate // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider); + TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), jobId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java index b99ae2f..994370c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java @@ -24,14 +24,12 @@ import org.apache.asterix.common.context.TransactionSubsystemProvider; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; @@ -276,11 +274,11 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon } protected IOperatorDescriptor createPrimaryIndexScanDiskComponentsOp(JobSpecification spec, - MetadataProvider metadataProvider, RecordDescriptor outRecDesc, JobId jobId) throws AlgebricksException { + MetadataProvider metadataProvider, RecordDescriptor outRecDesc, TxnId txnId) throws AlgebricksException { ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; boolean temp = dataset.getDatasetDetails().isTemp(); ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), + : new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(), dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider, IRecoveryManager.ResourceType.LSM_BTREE); IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
