HIVE-18192: Introduce WriteID per table rather than using global transaction ID (Sankar Hariappan, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbb9233a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbb9233a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbb9233a Branch: refs/heads/master Commit: cbb9233a3b39ab8489d777fc76f0758c49b69bef Parents: f9768af Author: Sankar Hariappan <sank...@apache.org> Authored: Fri Feb 23 22:00:23 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Fri Feb 23 22:00:23 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 11 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../streaming/AbstractRecordWriter.java | 24 +- .../streaming/DelimitedInputWriter.java | 8 +- .../hive/hcatalog/streaming/HiveEndPoint.java | 77 +- .../hive/hcatalog/streaming/RecordWriter.java | 12 +- .../hcatalog/streaming/StrictJsonWriter.java | 8 +- .../hcatalog/streaming/StrictRegexWriter.java | 8 +- .../hcatalog/streaming/TransactionBatch.java | 47 +- .../streaming/mutate/client/AcidTable.java | 14 +- .../mutate/client/AcidTableSerializer.java | 10 +- .../streaming/mutate/client/MutatorClient.java | 17 +- .../mutate/worker/MutatorCoordinator.java | 22 +- .../streaming/mutate/worker/MutatorFactory.java | 3 +- .../streaming/mutate/worker/MutatorImpl.java | 18 +- .../mutate/worker/SequenceValidator.java | 14 +- .../hive/hcatalog/streaming/TestStreaming.java | 109 +- .../mutate/ReflectiveMutatorFactory.java | 4 +- .../streaming/mutate/StreamingAssert.java | 16 +- .../mutate/client/TestAcidTableSerializer.java | 6 +- .../mutate/client/TestMutatorClient.java | 12 +- .../mutate/worker/TestMutatorCoordinator.java | 26 +- .../mutate/worker/TestMutatorImpl.java | 14 +- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 64 +- .../hive/ql/txn/compactor/TestCompactor.java | 106 +- .../upgrade/derby/044-HIVE-16997.derby.sql | 1 - .../upgrade/derby/050-HIVE-18192.derby.sql | 27 + .../derby/hive-txn-schema-3.0.0.derby.sql | 27 +- .../derby/upgrade-2.3.0-to-3.0.0.derby.sql | 1 + .../java/org/apache/hadoop/hive/ql/Driver.java | 107 +- .../hive/ql/exec/AbstractFileMergeOperator.java | 4 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 12 +- .../hadoop/hive/ql/exec/FetchOperator.java | 28 +- .../apache/hadoop/hive/ql/exec/FetchTask.java | 4 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 18 +- .../hadoop/hive/ql/exec/ImportCommitTask.java | 2 +- .../hadoop/hive/ql/exec/ImportCommitWork.java | 10 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 6 +- .../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 1 + .../apache/hadoop/hive/ql/exec/Utilities.java | 8 +- .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 1 + .../hadoop/hive/ql/io/AcidInputFormat.java | 34 +- .../hadoop/hive/ql/io/AcidOutputFormat.java | 30 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 234 +- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 4 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 42 +- .../hadoop/hive/ql/io/RecordIdentifier.java | 20 +- .../apache/hadoop/hive/ql/io/RecordUpdater.java | 13 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 41 +- .../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 14 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 126 +- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 92 +- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 144 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 69 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 14 +- .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 27 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 45 +- .../hive/ql/optimizer/GenMapRedUtils.java | 8 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 4 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 82 +- .../hive/ql/parse/LoadSemanticAnalyzer.java | 14 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 59 +- .../hadoop/hive/ql/plan/FileMergeDesc.java | 10 +- .../hadoop/hive/ql/plan/FileSinkDesc.java | 14 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 35 +- .../hadoop/hive/ql/plan/TableScanDesc.java | 12 +- .../hadoop/hive/ql/stats/ColStatsProcessor.java | 1 - .../hadoop/hive/ql/txn/compactor/Cleaner.java | 21 +- .../hive/ql/txn/compactor/CompactorMR.java | 54 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 27 +- .../hadoop/hive/ql/txn/compactor/Worker.java | 28 +- .../metastore/txn/TestCompactionTxnHandler.java | 13 +- .../hive/metastore/txn/TestTxnHandler.java | 12 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 34 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 36 +- .../apache/hadoop/hive/ql/TestTxnLoadData.java | 148 +- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 318 +- .../hive/ql/exec/TestFileSinkOperator.java | 18 +- .../hadoop/hive/ql/io/TestAcidInputFormat.java | 16 +- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 26 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 129 +- .../hive/ql/io/orc/TestOrcRecordUpdater.java | 16 +- .../TestVectorizedOrcAcidRowBatchReader.java | 14 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 109 +- ...TestGenMapRedUtilsCreateConditionalTask.java | 2 +- .../parse/TestUpdateDeleteSemanticAnalyzer.java | 1 + .../hive/ql/txn/compactor/CompactorTest.java | 39 +- .../hive/ql/txn/compactor/TestCleaner.java | 30 +- .../hive/ql/txn/compactor/TestInitiator.java | 50 +- .../hive/ql/txn/compactor/TestWorker.java | 45 +- .../results/clientpositive/acid_nullscan.q.out | 8 +- .../clientpositive/acid_table_stats.q.out | 14 +- .../clientpositive/autoColumnStats_4.q.out | 2 +- .../llap/acid_bucket_pruning.q.out | 4 +- .../test/results/clientpositive/row__id.q.out | 22 +- .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 3724 +++++++---- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 292 + .../ThriftHiveMetastore_server.skeleton.cpp | 10 + .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3341 ++++++---- .../gen/thrift/gen-cpp/hive_metastore_types.h | 307 +- .../metastore/api/AddDynamicPartitions.java | 159 +- .../api/AllocateTableWriteIdsRequest.java | 640 ++ .../api/AllocateTableWriteIdsResponse.java | 443 ++ .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../hive/metastore/api/CompactionRequest.java | 44 +- .../hive/metastore/api/CreationMetadata.java | 32 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/GetValidWriteIdsRequest.java | 539 ++ .../metastore/api/GetValidWriteIdsResponse.java | 443 ++ .../api/HeartbeatTxnRangeResponse.java | 64 +- .../metastore/api/InsertEventRequestData.java | 64 +- .../hadoop/hive/metastore/api/LockRequest.java | 36 +- .../hive/metastore/api/Materialization.java | 32 +- .../api/NotificationEventResponse.java | 36 +- .../metastore/api/PutFileMetadataRequest.java | 64 +- .../hive/metastore/api/ShowCompactResponse.java | 36 +- .../hive/metastore/api/ShowLocksResponse.java | 36 +- .../hive/metastore/api/TableValidWriteIds.java | 851 +++ .../hive/metastore/api/ThriftHiveMetastore.java | 6010 ++++++++++++------ .../hadoop/hive/metastore/api/TxnToWriteId.java | 482 ++ .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../gen-php/metastore/ThriftHiveMetastore.php | 1949 ++++-- .../src/gen/thrift/gen-php/metastore/Types.php | 1469 ++++- .../hive_metastore/ThriftHiveMetastore-remote | 14 + .../hive_metastore/ThriftHiveMetastore.py | 1397 ++-- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 1023 ++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 136 +- .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 137 + .../hadoop/hive/metastore/HiveMetaStore.java | 11 + .../hive/metastore/HiveMetaStoreClient.java | 38 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 50 +- .../hive/metastore/txn/CompactionInfo.java | 18 +- .../metastore/txn/CompactionTxnHandler.java | 41 +- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 49 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 430 +- .../hadoop/hive/metastore/txn/TxnStore.java | 33 +- .../hadoop/hive/metastore/txn/TxnUtils.java | 129 +- .../main/sql/derby/hive-schema-3.0.0.derby.sql | 27 +- .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql | 26 + .../main/sql/mssql/hive-schema-3.0.0.mssql.sql | 27 +- .../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql | 26 + .../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 26 +- .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 26 + .../sql/oracle/hive-schema-3.0.0.oracle.sql | 27 +- .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 26 + .../sql/postgres/hive-schema-3.0.0.postgres.sql | 27 +- .../upgrade-2.3.0-to-3.0.0.postgres.sql | 26 + .../src/main/thrift/hive_metastore.thrift | 50 +- .../hive/common/ValidCompactorTxnList.java | 89 - .../hive/common/ValidCompactorWriteIdList.java | 93 + .../hadoop/hive/common/ValidReadTxnList.java | 12 +- .../hive/common/ValidReaderWriteIdList.java | 254 + .../apache/hadoop/hive/common/ValidTxnList.java | 7 - .../hadoop/hive/common/ValidTxnWriteIdList.java | 101 + .../hadoop/hive/common/ValidWriteIdList.java | 118 + .../hive/common/TestValidCompactorTxnList.java | 134 - .../common/TestValidCompactorWriteIdList.java | 142 + .../hive/common/TestValidReaderWriteIdList.java | 120 + 170 files changed, 21424 insertions(+), 8611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 57afbf8..de0c283 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -150,14 +150,15 @@ public final class JavaUtils { public static String lockIdToString(long extLockId) { return "lockid:" + extLockId; } - /** - * Utility method for ACID to normalize logging info. Matches - * org.apache.hadoop.hive.metastore.api.LockResponse#toString - */ + public static String txnIdToString(long txnId) { return "txnid:" + txnId; } + public static String writeIdToString(long writeId) { + return "writeid:" + writeId; + } + public static String txnIdsToString(List<Long> txnIds) { return "Transactions requested to be aborted: " + txnIds.toString(); } @@ -166,7 +167,7 @@ public final class JavaUtils { // prevent instantiation } - public static Long extractTxnId(Path file) { + public static Long extractWriteId(Path file) { String fileName = file.getName(); String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 169ddcb..0880a96 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1094,7 +1094,7 @@ public class HiveConf extends Configuration { HIVESCRIPTTRUNCATEENV("hive.script.operator.truncate.env", false, "Truncate each environment variable for external script in scripts operator to 20KB (to fit system limits)"), HIVESCRIPT_ENV_BLACKLIST("hive.script.operator.env.blacklist", - "hive.txn.valid.txns,hive.script.operator.env.blacklist", + "hive.txn.valid.txns,hive.txn.tables.valid.writeids,hive.txn.valid.writeids,hive.script.operator.env.blacklist", "Comma separated list of keys from the configuration file not to convert to environment " + "variables when invoking the script operator"), HIVE_STRICT_CHECKS_ORDERBY_NO_LIMIT("hive.strict.checks.orderby.no.limit", false, http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 4ec10ad..924e233 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -73,8 +73,8 @@ public abstract class AbstractRecordWriter implements RecordWriter { private final AcidOutputFormat<?,?> outf; private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write. - private Long curBatchMinTxnId; - private Long curBatchMaxTxnId; + private Long curBatchMinWriteId; + private Long curBatchMaxWriteId; private static final class TableWriterPair { private final Table tbl; @@ -143,7 +143,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { * used to tag error msgs to provied some breadcrumbs */ String getWatermark() { - return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]"; + return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]"; } // return the column numbers of the bucketed columns private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) { @@ -207,15 +207,15 @@ public abstract class AbstractRecordWriter implements RecordWriter { /** * Creates a new record updater for the new batch - * @param minTxnId smallest Txnid in the batch - * @param maxTxnID largest Txnid in the batch + * @param minWriteId smallest writeid in the batch + * @param maxWriteID largest writeid in the batch * @throws StreamingIOFailure if failed to create record updater */ @Override - public void newBatch(Long minTxnId, Long maxTxnID) + public void newBatch(Long minWriteId, Long maxWriteID) throws StreamingIOFailure, SerializationError { - curBatchMinTxnId = minTxnId; - curBatchMaxTxnId = maxTxnID; + curBatchMinWriteId = minWriteId; + curBatchMaxWriteId = maxWriteID; updaters = new ArrayList<RecordUpdater>(totalBuckets); for (int bucket = 0; bucket < totalBuckets; bucket++) { updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds @@ -265,7 +265,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { return bucketFieldData; } - private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) + private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID) throws IOException, SerializationError { try { // Initialize table properties from the table parameters. This is required because the table @@ -278,8 +278,8 @@ public abstract class AbstractRecordWriter implements RecordWriter { .inspector(getSerde().getObjectInspector()) .bucket(bucketId) .tableProperties(tblProperties) - .minimumTransactionId(minTxnId) - .maximumTransactionId(maxTxnID) + .minimumWriteId(minWriteId) + .maximumWriteId(maxWriteID) .statementId(-1) .finalDestination(partitionPath)); } catch (SerDeException e) { @@ -292,7 +292,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { RecordUpdater recordUpdater = updaters.get(bucketId); if (recordUpdater == null) { try { - recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId); + recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId); } catch (IOException e) { String errMsg = "Failed creating RecordUpdater for " + getWatermark(); LOG.error(errMsg, e); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 0a5492c..999c37e 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -255,16 +255,16 @@ public class DelimitedInputWriter extends AbstractRecordWriter { } @Override - public void write(long transactionId, byte[] record) + public void write(long writeId, byte[] record) throws SerializationError, StreamingIOFailure { try { byte[] orderedFields = reorderFields(record); Object encodedRow = encode(orderedFields); int bucket = getBucket(encodedRow); - getRecordUpdater(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(writeId, encodedRow); } catch (IOException e) { - throw new StreamingIOFailure("Error writing record in transaction (" - + transactionId + ")", e); + throw new StreamingIOFailure("Error writing record in transaction write id (" + + writeId + ")", e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 6793d09..90731dc 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.session.SessionState; @@ -551,7 +552,7 @@ public class HiveEndPoint { private final IMetaStoreClient msClient; private final IMetaStoreClient heartbeaterMSClient; private final RecordWriter recordWriter; - private final List<Long> txnIds; + private final List<TxnToWriteId> txnToWriteIds; //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking" private volatile int currentTxnIndex = -1; @@ -602,14 +603,19 @@ public class HiveEndPoint { this.recordWriter = recordWriter; this.agentInfo = agentInfo; - txnIds = openTxnImpl(msClient, user, numTxns, ugi); + List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi); + txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi); + assert(txnToWriteIds.size() == numTxns); + txnStatus = new TxnState[numTxns]; for(int i = 0; i < txnStatus.length; i++) { + assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i)); txnStatus[i] = TxnState.OPEN;//Open matches Metastore state } - this.state = TxnState.INACTIVE; - recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1)); + + // The Write Ids returned for the transaction batch is also sequential + recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId()); success = true; } catch (TException e) { throw new TransactionBatchUnAvailable(endPt, e); @@ -632,12 +638,26 @@ public class HiveEndPoint { public Object run() throws Exception { return msClient.openTxns(user, numTxns).getTxn_ids(); } - }) ; + }); + } + + private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient, + final List<Long> txnIds, UserGroupInformation ugi) + throws IOException, TException, InterruptedException { + if(ugi==null) { + return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table); + } + return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table); + } + }); } @Override public String toString() { - if (txnIds==null || txnIds.isEmpty()) { + if (txnToWriteIds==null || txnToWriteIds.isEmpty()) { return "{}"; } StringBuilder sb = new StringBuilder(" TxnStatus["); @@ -646,7 +666,11 @@ public class HiveEndPoint { sb.append(state == null ? "N" : state); } sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed)); - return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1) + return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId() + + "/" + txnToWriteIds.get(0).getWriteId() + + "..." + + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId() + + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId() + "] on endPoint = " + endPt + "; " + sb; } @@ -680,7 +704,8 @@ public class HiveEndPoint { private void beginNextTransactionImpl() throws TransactionError { state = TxnState.INACTIVE;//clear state from previous txn - if ( currentTxnIndex + 1 >= txnIds.size() ) { + + if ((currentTxnIndex + 1) >= txnToWriteIds.size()) { throw new InvalidTrasactionState("No more transactions available in" + " current batch for end point : " + endPt); } @@ -699,13 +724,25 @@ public class HiveEndPoint { } /** - * Get Id of currently open transaction + * Get Id of currently open transaction. * @return -1 if there is no open TX */ @Override public Long getCurrentTxnId() { - if(currentTxnIndex >= 0) { - return txnIds.get(currentTxnIndex); + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getTxnId(); + } + return -1L; + } + + /** + * Get Id of currently open transaction. + * @return -1 if there is no open TX + */ + @Override + public Long getCurrentWriteId() { + if (currentTxnIndex >= 0) { + return txnToWriteIds.get(currentTxnIndex).getWriteId(); } return -1L; } @@ -727,9 +764,9 @@ public class HiveEndPoint { @Override public int remainingTransactions() { if (currentTxnIndex>=0) { - return txnIds.size() - currentTxnIndex -1; + return txnToWriteIds.size() - currentTxnIndex -1; } - return txnIds.size(); + return txnToWriteIds.size(); } @@ -824,7 +861,7 @@ public class HiveEndPoint { private void writeImpl(Collection<byte[]> records) throws StreamingException { for (byte[] record : records) { - recordWriter.write(getCurrentTxnId(), record); + recordWriter.write(getCurrentWriteId(), record); } } @@ -869,7 +906,7 @@ public class HiveEndPoint { private void commitImpl() throws TransactionError, StreamingException { try { recordWriter.flush(); - msClient.commitTxn(txnIds.get(currentTxnIndex)); + msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); state = TxnState.COMMITTED; txnStatus[currentTxnIndex] = TxnState.COMMITTED; } catch (NoSuchTxnException e) { @@ -932,8 +969,8 @@ public class HiveEndPoint { int minOpenTxnIndex = Math.max(currentTxnIndex + (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0); for(currentTxnIndex = minOpenTxnIndex; - currentTxnIndex < txnIds.size(); currentTxnIndex++) { - msClient.rollbackTxn(txnIds.get(currentTxnIndex)); + currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) { + msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); txnStatus[currentTxnIndex] = TxnState.ABORTED; } currentTxnIndex--;//since the loop left it == txnId.size() @@ -960,15 +997,15 @@ public class HiveEndPoint { if(isClosed) { return; } - if(state != TxnState.OPEN && currentTxnIndex >= txnIds.size() - 1) { + if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) { //here means last txn in the batch is resolved but the close() hasn't been called yet so //there is nothing to heartbeat return; } //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still //points at the last txn which we don't want to heartbeat - Long first = txnIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1); - Long last = txnIds.get(txnIds.size()-1); + Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId(); + Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId(); try { HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last); if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java index cddb8de..a9bcd9f 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java @@ -23,21 +23,21 @@ public interface RecordWriter { /** Writes using a hive RecordUpdater * - * @param transactionId the ID of the Txn in which the write occurs + * @param writeId the write ID of the table mapping to Txn in which the write occurs * @param record the record to be written */ - public void write(long transactionId, byte[] record) throws StreamingException; + void write(long writeId, byte[] record) throws StreamingException; /** Flush records from buffer. Invoked by TransactionBatch.commit() */ - public void flush() throws StreamingException; + void flush() throws StreamingException; /** Clear bufferred writes. Invoked by TransactionBatch.abort() */ - public void clear() throws StreamingException; + void clear() throws StreamingException; /** Acquire a new RecordUpdater. Invoked when * StreamingConnection.fetchTransactionBatch() is called */ - public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException; + void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException; /** Close the RecordUpdater. Invoked by TransactionBatch.close() */ - public void closeBatch() throws StreamingException; + void closeBatch() throws StreamingException; } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 357c537..4d92c55 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -117,15 +117,15 @@ public class StrictJsonWriter extends AbstractRecordWriter { @Override - public void write(long transactionId, byte[] record) + public void write(long writeId, byte[] record) throws StreamingIOFailure, SerializationError { try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); - getRecordUpdater(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(writeId, encodedRow); } catch (IOException e) { - throw new StreamingIOFailure("Error writing record in transaction(" - + transactionId + ")", e); + throw new StreamingIOFailure("Error writing record in transaction write id(" + + writeId + ")", e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java index 58db252..ae25662 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java @@ -124,15 +124,15 @@ public class StrictRegexWriter extends AbstractRecordWriter { @Override - public void write(long transactionId, byte[] record) + public void write(long writeId, byte[] record) throws StreamingIOFailure, SerializationError { try { Object encodedRow = encode(record); int bucket = getBucket(encodedRow); - getRecordUpdater(bucket).insert(transactionId, encodedRow); + getRecordUpdater(bucket).insert(writeId, encodedRow); } catch (IOException e) { - throw new StreamingIOFailure("Error writing record in transaction(" - + transactionId + ")", e); + throw new StreamingIOFailure("Error writing record in transaction write id(" + + writeId + ")", e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java index e5ad475..1208377 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java @@ -46,73 +46,80 @@ public interface TransactionBatch { } /** - * Activate the next available transaction in the current transaction batch + * Activate the next available transaction in the current transaction batch. * @throws StreamingException if not able to switch to next Txn * @throws InterruptedException if call in interrupted */ - public void beginNextTransaction() throws StreamingException, InterruptedException; + void beginNextTransaction() throws StreamingException, InterruptedException; /** - * Get Id of currently open transaction + * Get Id of currently open transaction. * @return transaction id */ - public Long getCurrentTxnId(); + Long getCurrentTxnId(); + + + /** + * Get write Id mapping to currently open transaction. + * @return write id + */ + Long getCurrentWriteId(); /** - * get state of current transaction + * get state of current transaction. */ - public TxnState getCurrentTransactionState(); + TxnState getCurrentTransactionState(); /** - * Commit the currently open transaction + * Commit the currently open transaction. * @throws StreamingException if there are errors committing * @throws InterruptedException if call in interrupted */ - public void commit() throws StreamingException, InterruptedException; + void commit() throws StreamingException, InterruptedException; /** - * Abort the currently open transaction + * Abort the currently open transaction. * @throws StreamingException if there are errors * @throws InterruptedException if call in interrupted */ - public void abort() throws StreamingException, InterruptedException; + void abort() throws StreamingException, InterruptedException; /** * Remaining transactions are the ones that are not committed or aborted or open. * Current open transaction is not considered part of remaining txns. * @return number of transactions remaining this batch. */ - public int remainingTransactions(); + int remainingTransactions(); /** - * Write record using RecordWriter + * Write record using RecordWriter. * @param record the data to be written * @throws StreamingException if there are errors when writing * @throws InterruptedException if call in interrupted */ - public void write(byte[] record) throws StreamingException, InterruptedException; + void write(byte[] record) throws StreamingException, InterruptedException; /** - * Write records using RecordWriter + * Write records using RecordWriter. * @throws StreamingException if there are errors when writing * @throws InterruptedException if call in interrupted */ - public void write(Collection<byte[]> records) throws StreamingException, InterruptedException; + void write(Collection<byte[]> records) throws StreamingException, InterruptedException; /** * Issues a heartbeat to hive metastore on the current and remaining txn ids - * to keep them from expiring + * to keep them from expiring. * @throws StreamingException if there are errors */ - public void heartbeat() throws StreamingException; + void heartbeat() throws StreamingException; /** - * Close the TransactionBatch + * Close the TransactionBatch. * @throws StreamingException if there are errors closing batch * @throws InterruptedException if call in interrupted */ - public void close() throws StreamingException, InterruptedException; - public boolean isClosed(); + void close() throws StreamingException, InterruptedException; + boolean isClosed(); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java index 5b371e3..50ba0c7 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java @@ -34,7 +34,7 @@ public class AcidTable implements Serializable { private final String tableName; private final boolean createPartitions; private final TableType tableType; - private long transactionId; + private long writeId; private Table table; @@ -48,10 +48,10 @@ public class AcidTable implements Serializable { /** * Returns {@code 0} until such a time that a {@link Transaction} has been acquired (when * {@link MutatorClient#newTransaction()} exits), at which point this will return the - * {@link Transaction#getTransactionId() transaction id}. + * write id. */ - public long getTransactionId() { - return transactionId; + public long getWriteId() { + return writeId; } public String getDatabaseName() { @@ -105,8 +105,8 @@ public class AcidTable implements Serializable { return table; } - void setTransactionId(long transactionId) { - this.transactionId = transactionId; + void setWriteId(long writeId) { + this.writeId = writeId; } void setTable(Table table) { @@ -123,7 +123,7 @@ public class AcidTable implements Serializable { public String toString() { return "AcidTable [databaseName=" + databaseName + ", tableName=" + tableName + ", createPartitions=" + createPartitions + ", tableType=" + tableType + ", outputFormatName=" + getOutputFormatName() - + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + transactionId + "]"; + + ", totalBuckets=" + getTotalBuckets() + ", writeId=" + writeId + "]"; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java index 32db5e3..98f9f40 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java @@ -54,10 +54,10 @@ public class AcidTableSerializer { data.writeUTF(table.getDatabaseName()); data.writeUTF(table.getTableName()); data.writeBoolean(table.createPartitions()); - if (table.getTransactionId() <= 0) { - LOG.warn("Transaction ID <= 0. The recipient is probably expecting a transaction ID."); + if (table.getWriteId() <= 0) { + LOG.warn("Write ID <= 0. The recipient is probably expecting a table write ID."); } - data.writeLong(table.getTransactionId()); + data.writeLong(table.getWriteId()); data.writeByte(table.getTableType().getId()); Table metaTable = table.getTable(); @@ -91,12 +91,12 @@ public class AcidTableSerializer { String databaseName = in.readUTF(); String tableName = in.readUTF(); boolean createPartitions = in.readBoolean(); - long transactionId = in.readLong(); + long writeId = in.readLong(); TableType tableType = TableType.valueOf(in.readByte()); int thriftLength = in.readInt(); table = new AcidTable(databaseName, tableName, createPartitions, tableType); - table.setTransactionId(transactionId); + table.setWriteId(writeId); Table metaTable = null; if (thriftLength > 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java index 645274e..8ba6cf6 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock; import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener; import org.apache.thrift.TException; @@ -94,8 +95,22 @@ public class MutatorClient implements Closeable { throw new TransactionException("Not connected - cannot create transaction."); } Transaction transaction = new Transaction(metaStoreClient, lockOptions); + long txnId = transaction.getTransactionId(); for (AcidTable table : tables) { - table.setTransactionId(transaction.getTransactionId()); + try { + table.setWriteId(metaStoreClient.allocateTableWriteId(txnId, + table.getDatabaseName(), table.getTableName())); + } catch (TException ex) { + try { + metaStoreClient.rollbackTxn(txnId); + } catch (TException e) { + LOG.warn("Allocation of write id failed for table {} and rollback transaction {} failed due to {}", + AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName()), txnId, e.getMessage()); + } + throw new TransactionException("Unable to allocate table write ID for table " + + AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName()) + + " under txn " + txnId, ex); + } } LOG.debug("Created transaction {}", transaction); return transaction; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index ae23153..5e804d7 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -98,11 +98,11 @@ public class MutatorCoordinator implements Closeable, Flushable { } /** - * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId). * * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed * using the values in the record's bucketed columns. - * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId) * sequence. * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already * been closed. @@ -120,11 +120,11 @@ public class MutatorCoordinator implements Closeable, Flushable { } /** - * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId). * * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed * using the values in the record's bucketed columns. - * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId) * sequence. * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already * been closed. @@ -142,11 +142,11 @@ public class MutatorCoordinator implements Closeable, Flushable { } /** - * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId). + * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId). * * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed * using the values in the record's bucketed columns. - * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId) + * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId) * sequence. * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already * been closed. @@ -229,9 +229,9 @@ public class MutatorCoordinator implements Closeable, Flushable { sequenceValidator.reset(); if (deleteDeltaIfExists) { // TODO: Should this be the concern of the mutator? - deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId); + deleteDeltaIfExists(newPartitionPath, table.getWriteId(), newBucketId); } - mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId); + mutator = mutatorFactory.newMutator(outputFormat, table.getWriteId(), newPartitionPath, newBucketId); bucketId = newBucketId; partitionValues = newPartitionValues; partitionPath = newPartitionPath; @@ -282,12 +282,12 @@ public class MutatorCoordinator implements Closeable, Flushable { } /* A delta may be present from a previous failed task attempt. */ - private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException { + private void deleteDeltaIfExists(Path partitionPath, long writeId, int bucketId) throws IOException { Path deltaPath = AcidUtils.createFilename(partitionPath, new AcidOutputFormat.Options(configuration) .bucket(bucketId) - .minimumTransactionId(transactionId) - .maximumTransactionId(transactionId)); + .minimumWriteId(writeId) + .maximumWriteId(writeId)); FileSystem fileSystem = deltaPath.getFileSystem(configuration); if (fileSystem.exists(deltaPath)) { LOG.info("Deleting existing delta path: {}", deltaPath); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java index 22cd9b7..da7558f 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; public interface MutatorFactory { - Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException; + Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId) + throws IOException; RecordInspector newRecordInspector(); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java index 05cf8b7..84c477f 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; /** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */ public class MutatorImpl implements Mutator { - private final long transactionId; + private final long writeId; private final Path partitionPath; private final int bucketProperty; private final Configuration configuration; @@ -44,11 +44,11 @@ public class MutatorImpl implements Mutator { * @throws IOException */ public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector, - AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketProperty) throws IOException { + AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketProperty) throws IOException { this.configuration = configuration; this.recordIdColumn = recordIdColumn; this.objectInspector = objectInspector; - this.transactionId = transactionId; + this.writeId = writeId; this.partitionPath = partitionPath; this.bucketProperty = bucketProperty; @@ -57,17 +57,17 @@ public class MutatorImpl implements Mutator { @Override public void insert(Object record) throws IOException { - updater.insert(transactionId, record); + updater.insert(writeId, record); } @Override public void update(Object record) throws IOException { - updater.update(transactionId, record); + updater.update(writeId, record); } @Override public void delete(Object record) throws IOException { - updater.delete(transactionId, record); + updater.delete(writeId, record); } /** @@ -89,7 +89,7 @@ public class MutatorImpl implements Mutator { @Override public String toString() { - return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath + return "ObjectInspectorMutator [writeId=" + writeId + ", partitionPath=" + partitionPath + ", bucketId=" + bucketProperty + "]"; } @@ -101,8 +101,8 @@ public class MutatorImpl implements Mutator { new AcidOutputFormat.Options(configuration) .inspector(objectInspector) .bucket(bucketId) - .minimumTransactionId(transactionId) - .maximumTransactionId(transactionId) + .minimumWriteId(writeId) + .maximumWriteId(writeId) .recordIdColumn(recordIdColumn) .finalDestination(partitionPath) .statementId(-1)); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java index 5cd8081..320b987 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java @@ -29,22 +29,22 @@ class SequenceValidator { private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class); - private Long lastTxId; + private Long lastWriteId; private Long lastRowId; SequenceValidator() { } boolean isInSequence(RecordIdentifier recordIdentifier) { - if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) { - LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier); + if (lastWriteId != null && recordIdentifier.getWriteId() < lastWriteId) { + LOG.debug("Non-sequential write ID. Expected >{}, recordIdentifier={}", lastWriteId, recordIdentifier); return false; - } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null + } else if (lastWriteId != null && recordIdentifier.getWriteId() == lastWriteId && lastRowId != null && recordIdentifier.getRowId() <= lastRowId) { LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier); return false; } - lastTxId = recordIdentifier.getTransactionId(); + lastWriteId = recordIdentifier.getWriteId(); lastRowId = recordIdentifier.getRowId(); return true; } @@ -53,14 +53,14 @@ class SequenceValidator { * Validator must be reset for each new partition and or bucket. */ void reset() { - lastTxId = null; + lastWriteId = null; lastRowId = null; LOG.debug("reset"); } @Override public String toString() { - return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]"; + return "SequenceValidator [lastWriteId=" + lastWriteId + ", lastRowId=" + lastRowId + "]"; } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index da2ca72..b042049 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -45,7 +45,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -374,16 +374,16 @@ public class TestStreaming { Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); - Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); - Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); + Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'"); queryTable(driver, "delete from default.streamingnobuckets where a='a1'"); @@ -398,14 +398,14 @@ public class TestStreaming { runWorker(conf); rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } /** @@ -540,8 +540,8 @@ public class TestStreaming { @Deprecated private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { - ValidTxnList txns = msClient.getValidTxns(); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); + ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories(); @@ -555,11 +555,11 @@ public class TestStreaming { long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; for (AcidUtils.ParsedDelta pd : current) { - if (pd.getMaxTransaction() > max) { - max = pd.getMaxTransaction(); + if (pd.getMaxWriteId() > max) { + max = pd.getMaxWriteId(); } - if (pd.getMinTransaction() < min) { - min = pd.getMinTransaction(); + if (pd.getMinWriteId() < min) { + min = pd.getMinWriteId(); } } Assert.assertEquals(minTxn, min); @@ -573,7 +573,7 @@ public class TestStreaming { job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); AcidUtils.setAcidOperationalProperties(job, true, null); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); - job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(numExpectedFiles, splits.length); org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr = @@ -593,7 +593,7 @@ public class TestStreaming { */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - ValidTxnList txns = msClient.getValidTxns(); + ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); @@ -608,11 +608,11 @@ public class TestStreaming { long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; for (AcidUtils.ParsedDelta pd : current) { - if (pd.getMaxTransaction() > max) { - max = pd.getMaxTransaction(); + if (pd.getMaxWriteId() > max) { + max = pd.getMaxWriteId(); } - if (pd.getMinTransaction() < min) { - min = pd.getMinTransaction(); + if (pd.getMinWriteId() < min) { + min = pd.getMinWriteId(); } } Assert.assertEquals(minTxn, min); @@ -637,8 +637,8 @@ public class TestStreaming { } private void checkNothingWritten(Path partitionPath) throws Exception { - ValidTxnList txns = msClient.getValidTxns(); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); + ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories(); @@ -877,7 +877,7 @@ public class TestStreaming { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -889,11 +889,11 @@ public class TestStreaming { txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -945,7 +945,7 @@ public class TestStreaming { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -957,11 +957,11 @@ public class TestStreaming { txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1007,7 +1007,7 @@ public class TestStreaming { txnBatch.write(rec1.getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -1134,7 +1134,7 @@ public class TestStreaming { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1153,13 +1153,13 @@ public class TestStreaming { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; - checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming"); + checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 15, 24, 1, validationQuery, true, "1\tHello streaming", + checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming"); txnBatch.close(); @@ -1170,14 +1170,14 @@ public class TestStreaming { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 15, 40, 2, validationQuery, false, "1\tHello streaming", + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 15, 40, 2, validationQuery, true, "1\tHello streaming", + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); @@ -1214,14 +1214,15 @@ public class TestStreaming { txnBatch2.commit(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; - checkDataWritten2(partLoc, 24, 33, 1, + checkDataWritten2(partLoc, 11, 20, 1, validationQuery, true, "3\tHello streaming - once again"); txnBatch1.commit(); /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns()); + AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, + msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1234,7 +1235,7 @@ public class TestStreaming { Assert.assertTrue("", logicalLength == actualLength); } } - checkDataWritten2(partLoc, 14, 33, 2, + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.beginNextTransaction(); @@ -1246,7 +1247,7 @@ public class TestStreaming { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns()); + dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1259,19 +1260,19 @@ public class TestStreaming { Assert.assertTrue("", logicalLength <= actualLength); } } - checkDataWritten2(partLoc, 14, 33, 2, + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.commit(); - checkDataWritten2(partLoc, 14, 33, 2, + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again"); txnBatch2.commit(); - checkDataWritten2(partLoc, 14, 33, 2, + checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", @@ -2281,8 +2282,8 @@ public class TestStreaming { this.delegate = delegate; } @Override - public void write(long transactionId, byte[] record) throws StreamingException { - delegate.write(transactionId, record); + public void write(long writeId, byte[] record) throws StreamingException { + delegate.write(writeId, record); produceFault(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java index e057da7..c05ddcf 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java @@ -49,9 +49,9 @@ public class ReflectiveMutatorFactory implements MutatorFactory { } @Override - public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) + public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId) throws IOException { - return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath, + return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, writeId, partitionPath, bucketId); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 873cddf..2aa8674 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -71,7 +71,7 @@ public class StreamingAssert { private List<String> partition; private IMetaStoreClient metaStoreClient; private Directory dir; - private ValidTxnList txns; + private ValidWriteIdList writeIds; private List<AcidUtils.ParsedDelta> currentDeltas; private long min; private long max; @@ -83,9 +83,9 @@ public class StreamingAssert { this.table = table; this.partition = partition; - txns = metaStoreClient.getValidTxns(); + writeIds = metaStoreClient.getValidWriteIds(AcidUtils.getFullTableName(table.getDbName(), table.getTableName())); partitionLocation = getPartitionLocation(); - dir = AcidUtils.getAcidState(partitionLocation, conf, txns); + dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds); assertEquals(0, dir.getObsolete().size()); assertEquals(0, dir.getOriginalFiles().size()); @@ -95,8 +95,8 @@ public class StreamingAssert { System.out.println("Files found: "); for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) { System.out.println(parsedDelta.getPath().toString()); - max = Math.max(parsedDelta.getMaxTransaction(), max); - min = Math.min(parsedDelta.getMinTransaction(), min); + max = Math.max(parsedDelta.getMaxWriteId(), max); + min = Math.min(parsedDelta.getMinWriteId(), min); } } @@ -145,7 +145,7 @@ public class StreamingAssert { job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); AcidUtils.setAcidOperationalProperties(job, true, null); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); - job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); InputSplit[] splits = inputFormat.getSplits(job, 1); assertEquals(numSplitsExpected, splits.length); @@ -160,7 +160,7 @@ public class StreamingAssert { while (recordReader.next(key, value)) { RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); - Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), + Record record = new Record(new RecordIdentifier(recordIdentifier.getWriteId(), recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString()); System.out.println(record); records.add(record); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java index 7876e8d..1523a10 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java @@ -45,7 +45,7 @@ public class TestAcidTableSerializer { AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK); acidTable.setTable(table); - acidTable.setTransactionId(42L); + acidTable.setWriteId(42L); String encoded = AcidTableSerializer.encode(acidTable); System.out.println(encoded); @@ -57,7 +57,7 @@ public class TestAcidTableSerializer { assertThat(decoded.getOutputFormatName(), is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")); assertThat(decoded.getTotalBuckets(), is(10)); assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1")); - assertThat(decoded.getTransactionId(), is(42L)); + assertThat(decoded.getWriteId(), is(42L)); assertThat(decoded.getTableType(), is(TableType.SINK)); assertThat(decoded.getTable(), is(table)); } @@ -75,7 +75,7 @@ public class TestAcidTableSerializer { assertThat(decoded.getOutputFormatName(), is(nullValue())); assertThat(decoded.getTotalBuckets(), is(0)); assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1")); - assertThat(decoded.getTransactionId(), is(0L)); + assertThat(decoded.getWriteId(), is(0L)); assertThat(decoded.getTableType(), is(TableType.SINK)); assertThat(decoded.getTable(), is(nullValue())); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java index cfe3a96..91b90ed 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java @@ -48,6 +48,8 @@ import org.mockito.runners.MockitoJUnitRunner; public class TestMutatorClient { private static final long TRANSACTION_ID = 42L; + private static final long WRITE_ID1 = 78L; + private static final long WRITE_ID2 = 33L; private static final String TABLE_NAME_1 = "TABLE_1"; private static final String TABLE_NAME_2 = "TABLE_2"; private static final String DB_NAME = "DB_1"; @@ -89,6 +91,8 @@ public class TestMutatorClient { when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString()); when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID); + when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_1)).thenReturn(WRITE_ID1); + when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_2)).thenReturn(WRITE_ID2); client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER, Collections.singletonList(TABLE_1)); @@ -110,13 +114,13 @@ public class TestMutatorClient { assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1)); assertThat(outTables.get(0).getTotalBuckets(), is(2)); assertThat(outTables.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName())); - assertThat(outTables.get(0).getTransactionId(), is(0L)); + assertThat(outTables.get(0).getWriteId(), is(0L)); assertThat(outTables.get(0).getTable(), is(mockTable1)); assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME)); assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2)); assertThat(outTables.get(1).getTotalBuckets(), is(2)); assertThat(outTables.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName())); - assertThat(outTables.get(1).getTransactionId(), is(0L)); + assertThat(outTables.get(1).getWriteId(), is(0L)); assertThat(outTables.get(1).getTable(), is(mockTable2)); } @@ -179,8 +183,8 @@ public class TestMutatorClient { assertThat(transaction.getTransactionId(), is(TRANSACTION_ID)); assertThat(transaction.getState(), is(TxnState.INACTIVE)); - assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID)); - assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID)); + assertThat(outTables.get(0).getWriteId(), is(WRITE_ID1)); + assertThat(outTables.get(1).getWriteId(), is(WRITE_ID2)); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java index d897477..fab56b3 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java @@ -49,7 +49,7 @@ public class TestMutatorCoordinator { private static final List<String> UNPARTITIONED = Collections.<String> emptyList(); private static final List<String> PARTITION_B = Arrays.asList("B"); private static final List<String> PARTITION_A = Arrays.asList("A"); - private static final long TRANSACTION_ID = 2L; + private static final long WRITE_ID = 2L; private static final int BUCKET_ID = 0; private static final Path PATH_A = new Path("X"); private static final Path PATH_B = new Path("B"); @@ -84,7 +84,7 @@ public class TestMutatorCoordinator { public void createCoordinator() throws Exception { when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName()); when(mockAcidTable.getTotalBuckets()).thenReturn(1); - when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID); + when(mockAcidTable.getWriteId()).thenReturn(WRITE_ID); when(mockAcidTable.createPartitions()).thenReturn(true); when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector); when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver); @@ -104,7 +104,7 @@ public class TestMutatorCoordinator { coordinator.insert(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).insert(RECORD); } @@ -115,7 +115,7 @@ public class TestMutatorCoordinator { coordinator.insert(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator, times(3)).insert(RECORD); } @@ -129,8 +129,8 @@ public class TestMutatorCoordinator { verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A); verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID)); verify(mockMutator, times(2)).insert(RECORD); } @@ -143,9 +143,9 @@ public class TestMutatorCoordinator { coordinator.update(UNPARTITIONED, RECORD); coordinator.delete(UNPARTITIONED, RECORD); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutatorFactory) - .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1)); + .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID + 1)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } @@ -166,11 +166,11 @@ public class TestMutatorCoordinator { coordinator.update(PARTITION_B, RECORD); /* PbB1 */ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); - verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID)); verify(mockMutatorFactory) - .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1)); + .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID + 1)); verify(mockMutator, times(2)).update(RECORD); verify(mockMutator).delete(RECORD); verify(mockMutator).insert(RECORD); @@ -197,7 +197,7 @@ public class TestMutatorCoordinator { coordinator.delete(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } @@ -210,7 +210,7 @@ public class TestMutatorCoordinator { coordinator.delete(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); }