This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 730da48f65 HIVE-26107: Worker shouldn't inject duplicate entries in
`ready for cleaning` state into the compaction queue (Laszlo Vegh, reviewed by
Denys Kuzmenko, Karen Coppage)
730da48f65 is described below
commit 730da48f65ca2fb6ff771820a7a1fffae10ea7bd
Author: veghlaci05 <[email protected]>
AuthorDate: Mon May 2 11:20:36 2022 +0200
HIVE-26107: Worker shouldn't inject duplicate entries in `ready for
cleaning` state into the compaction queue (Laszlo Vegh, reviewed by Denys
Kuzmenko, Karen Coppage)
Closes #3172
---
.../org/apache/hadoop/hive/conf/Constants.java | 2 +
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 1 +
.../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 1 +
.../hive/ql/txn/compactor/TestCompactor.java | 16 +--
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 69 +++++++++++--
.../TestFetchWriteIdFromInsertOnlyTables.java | 6 +-
.../ql/txn/compactor/TestMmCompactorOnTez.java | 2 +-
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 10 +-
.../org/apache/hadoop/hive/ql/DriverContext.java | 9 ++
.../apache/hadoop/hive/ql/DriverTxnHandler.java | 3 +-
.../org/apache/hadoop/hive/ql/DriverUtils.java | 37 +++++--
.../compact/AlterTableCompactOperation.java | 9 ++
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 10 ++
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 6 ++
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 12 ++-
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 2 +-
.../hive/ql/txn/compactor/QueryCompactor.java | 6 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 2 +-
.../hadoop/hive/metastore/txn/TestTxnHandler.java | 4 +-
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 6 ++
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 34 ++++---
.../org/apache/hadoop/hive/ql/TestTxnLoadData.java | 10 +-
.../hive/ql/stats/TestStatsUpdaterThread.java | 2 +-
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 28 ++++--
.../ql/txn/compactor/TestCompactionMetrics.java | 13 ++-
.../clientpositive/acid_insert_overwrite_update.q | 1 -
.../queries/clientpositive/dbtxnmgr_compact1.q | 6 +-
.../queries/clientpositive/dbtxnmgr_compact3.q | 6 +-
.../llap/acid_insert_overwrite_update.q.out | 4 -
.../clientpositive/llap/dbtxnmgr_compact1.q.out | 20 +++-
.../clientpositive/llap/dbtxnmgr_compact3.q.out | 20 +++-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 25 +++++
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 15 ++-
.../hive/metastore/api/CompactionResponse.java | 112 ++++++++++++++++++++-
.../gen-php/metastore/CompactionResponse.php | 24 +++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 ++-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../src/main/thrift/hive_metastore.thrift | 3 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 26 +++--
39 files changed, 484 insertions(+), 96 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index a62d8e8f13..b89cdf3fad 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -91,4 +91,6 @@ public class Constants {
public static final String ACID_FETCH_DELETED_ROWS =
"acid.fetch.deleted.rows";
public static final String INSERT_ONLY_FETCH_BUCKET_ID =
"insertonly.fetch.bucketid";
+
+ public static final String ERROR_MESSAGE_NO_DETAILS_AVAILABLE = "No detailed
message available";
}
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 749b76b41f..277e647557 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -477,6 +477,7 @@ public enum ErrorMsg {
TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please
choose a storage format which supports the feature.", true),
INVALID_METADATA_TABLE_NAME(10430, "Invalid metadata table name {0}.", true),
METADATA_TABLE_NOT_SUPPORTED(10431, "Metadata tables are not supported for
table {0}.", true),
+ COMPACTION_REFUSED(10432, "Compaction request for {0}.{1}{2} is refused,
details: {3}.", true),
//========================== 20000 range starts here
========================//
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index d4ce79b237..602c064ce5 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -556,6 +556,7 @@ public class TestAcidOnTez {
for(int i = 0; i < expectedDelDelta2.length; i++) {
Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found
on disk", expectedDelDelta2[i]);
}
+ runCleaner(hiveConf);
//run Major compaction
runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact
'major'", confForTez);
runWorker(hiveConf);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 16b59576ef..0127d881c4 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -873,7 +873,7 @@ public class TestCompactor {
Assert.fail("Expecting 1 file \"base_0000004\" and found " +
stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals("base_0000005_v0000009", name);
+ Assert.assertEquals("base_0000004_v0000009", name);
CompactorTestUtil
.checkExpectedTxnsPresent(stat[0].getPath(), null,
columnNamesProperty, columnTypesProperty, 0, 1L, 4L, null,
1);
@@ -918,11 +918,11 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName);
List<String> matchesNotFound = new ArrayList<>(5);
- matchesNotFound.add(AcidUtils.deleteDeltaSubdir(4, 5) +
VISIBILITY_PATTERN);
- matchesNotFound.add(AcidUtils.deltaSubdir(4, 5) + VISIBILITY_PATTERN);
+ matchesNotFound.add(AcidUtils.deleteDeltaSubdir(3, 4) +
VISIBILITY_PATTERN);
+ matchesNotFound.add(AcidUtils.deltaSubdir(3, 4) + VISIBILITY_PATTERN);
matchesNotFound.add(AcidUtils.deleteDeltaSubdir(5, 5, 0));
matchesNotFound.add(AcidUtils.deltaSubdir(5, 5, 1));
- matchesNotFound.add(AcidUtils.baseDir(6) + VISIBILITY_PATTERN);
+ matchesNotFound.add(AcidUtils.baseDir(5) + VISIBILITY_PATTERN);
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
Table table = msClient.getTable(dbName, tblName);
@@ -1722,7 +1722,7 @@ public class TestCompactor {
msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
runMajorCompaction(dbName, tblName); // Compact 4 and 5.
verifyFooBarResult(tblName, 2);
- verifyHasBase(table.getSd(), fs, "base_0000006_v0000017");
+ verifyHasBase(table.getSd(), fs, "base_0000005_v0000017");
runCleaner(conf);
// in case when we have # of accumulated entries for the same
table/partition - we need to process them one-by-one in ASC order of write_id's,
// however, to support multi-threaded processing in the Cleaner, we have
to move entries from the same group to the next Cleaner cycle,
@@ -1791,7 +1791,7 @@ public class TestCompactor {
verifyFooBarResult(tblName, 3);
verifyDeltaCount(p3.getSd(), fs, 1);
verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
- verifyHasBase(p2.getSd(), fs, "base_0000007_v0000015");
+ verifyHasBase(p2.getSd(), fs, "base_0000006_v0000015");
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds)
VALUES(1, 'foo', 2)", driver);
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds)
VALUES(2, 'bar', 2)", driver);
@@ -1802,7 +1802,7 @@ public class TestCompactor {
verifyFooBarResult(tblName, 4);
verifyDeltaCount(p3.getSd(), fs, 1);
verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
- verifyHasBase(p2.getSd(), fs, "base_0000007_v0000015");
+ verifyHasBase(p2.getSd(), fs, "base_0000006_v0000015");
}
@@ -2469,7 +2469,7 @@ public class TestCompactor {
files = fs.listStatus(new Path(table.getSd().getLocation()));
// base dir
assertEquals(1, files.length);
- assertEquals("base_0000004_v0000016", files[0].getPath().getName());
+ assertEquals("base_0000003_v0000016", files[0].getPath().getName());
files = fs.listStatus(files[0].getPath(), AcidUtils.bucketFileFilter);
// files
assertEquals(2, files.length);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index f409ec2efe..ece4d5290e 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
@@ -122,6 +125,52 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
"ready for cleaning", compacts.get(0).getState());
}
+ @Test
+ public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+ final String dbName = "default";
+ final String tableName = "compaction_test";
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value
string) CLUSTERED BY(id) " +
+ "INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true')",
driver);
+
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('1','one'),('2','two'),('3','three')," +
+
"('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
+
+
"('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
+
+
"('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')",
driver);
+
+ executeStatementOnDriver("insert into " + tableName + " values ('21',
'value21'),('84', 'value84')," +
+ "('66', 'value66'),('54', 'value54')", driver);
+ executeStatementOnDriver("insert into " + tableName + " values ('22',
'value22'),('34', 'value34')," +
+ "('35', 'value35')", driver);
+ executeStatementOnDriver("insert into " + tableName + " values ('75',
'value75'),('99', 'value99')", driver);
+
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+ //Do a compaction directly and wait for it to finish
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ CompactionResponse resp = txnHandler.compact(rqst);
+ runWorker(conf);
+
+ //Try to do a second compaction on the same table before the cleaner runs.
+ try {
+ driver.run("ALTER TABLE " + tableName + " COMPACT 'major'");
+ } catch (CommandProcessorException e) {
+ String errorMessage = ErrorMsg.COMPACTION_REFUSED.format(dbName,
tableName, "",
+ "Compaction is already scheduled with state='ready for cleaning' and
id=" + resp.getId());
+ Assert.assertEquals(errorMessage, e.getCauseMessage());
+ Assert.assertEquals(ErrorMsg.COMPACTION_REFUSED.getErrorCode(),
e.getErrorCode());
+ }
+
+ //Check if the first compaction is in 'ready for cleaning'
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ }
+
@Test
public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles()
throws Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
@@ -581,10 +630,10 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
Collections.singletonList("base_0000005_v0000009"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, partitionToday));
Assert.assertEquals("Base directory does not match after major compaction",
- Collections.singletonList("base_0000006_v0000014"),
+ Collections.singletonList("base_0000005_v0000014"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, partitionTomorrow));
Assert.assertEquals("Base directory does not match after major compaction",
- Collections.singletonList("base_0000007_v0000019"),
+ Collections.singletonList("base_0000005_v0000019"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, partitionYesterday));
// Check base dir contents
List<String> expectedBucketFiles = Arrays.asList("bucket_00000");
@@ -593,17 +642,17 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
.getBucketFileNames(fs, table, partitionToday,
"base_0000005_v0000009"));
Assert.assertEquals("Bucket names are not matching after compaction",
expectedBucketFiles,
CompactorTestUtil
- .getBucketFileNames(fs, table, partitionTomorrow,
"base_0000006_v0000014"));
+ .getBucketFileNames(fs, table, partitionTomorrow,
"base_0000005_v0000014"));
Assert.assertEquals("Bucket names are not matching after compaction",
expectedBucketFiles,
CompactorTestUtil
- .getBucketFileNames(fs, table, partitionYesterday,
"base_0000007_v0000019"));
+ .getBucketFileNames(fs, table, partitionYesterday,
"base_0000005_v0000019"));
// Check buckets contents
Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0,
testDataProvider.getBucketData(tblName, "536870912"));
// Check bucket file contents
checkBucketIdAndRowIdInAcidFile(fs, new Path(todayPath,
"base_0000005_v0000009"), 0);
- checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath,
"base_0000006_v0000014"), 0);
- checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath,
"base_0000007_v0000019"), 0);
+ checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath,
"base_0000005_v0000014"), 0);
+ checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath,
"base_0000005_v0000019"), 0);
CompactorTestUtilities.checkAcidVersion(fs.listFiles(new
Path(table.getSd().getLocation()), true), fs,
conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE), new
String[] { AcidUtils.BASE_PREFIX});
@@ -667,8 +716,8 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
verifySuccessfulCompaction( 3);
// Verify base directories after compaction in each partition
String expectedBaseToday = "base_0000005_v0000011";
- String expectedBaseTomorrow = "base_0000006_v0000016";
- String expectedBaseYesterday = "base_0000007_v0000021";
+ String expectedBaseTomorrow = "base_0000005_v0000016";
+ String expectedBaseYesterday = "base_0000005_v0000021";
List<String> baseDeltasInToday =
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, partitionToday);
Assert.assertEquals("Delta directories does not match after compaction",
@@ -1587,10 +1636,10 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
Collections.singletonList("base_0000005_v0000009"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, null));
Assert.assertEquals("Delta directories do not match after major
compaction",
- Collections.singletonList("delta_0000007_0000011_v0000021"),
+ Collections.singletonList("delta_0000006_0000010_v0000021"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter,
table, null));
Assert.assertEquals("Delete delta directories does not match after minor
compaction",
- Collections.singletonList("delete_delta_0000007_0000011_v0000021"),
+ Collections.singletonList("delete_delta_0000006_0000010_v0000021"),
CompactorTestUtil.getBaseOrDeltaNames(fs,
AcidUtils.deleteEventDeltaDirFilter, table, null));
// Verify all contents
actualData = dataProvider.getAllData(tableName);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestFetchWriteIdFromInsertOnlyTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestFetchWriteIdFromInsertOnlyTables.java
index 75cc0ff0aa..f393ec58e9 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestFetchWriteIdFromInsertOnlyTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestFetchWriteIdFromInsertOnlyTables.java
@@ -38,9 +38,9 @@ public class TestFetchWriteIdFromInsertOnlyTables extends
CompactorOnTezTest {
"0\t10\t10",
"0\t1\t1",
"0\t2\t20",
- "4\t2\t32",
- "4\t10\t15",
- "4\t42\t42"
+ "3\t2\t32",
+ "3\t10\t15",
+ "3\t42\t42"
);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index a8d7bc3105..841b6b85e4 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -429,7 +429,7 @@ public class TestMmCompactorOnTez extends
CompactorOnTezTest {
Collections.singletonList("base_0000003_v0000007"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter,
table, null));
Assert.assertEquals("Delta directories does not match after minor
compaction",
- Collections.singletonList("delta_0000005_0000007_v0000017"),
+ Collections.singletonList("delta_0000004_0000006_v0000017"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter,
table, null));
verifyAllContents(tableName, dataProvider, expectedData);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a475b4eb2b..c2ecbf46bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -106,13 +106,17 @@ public class Driver implements IDriver {
this(queryState, queryInfo, null);
}
- public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager
txnManager,
- ValidWriteIdList compactionWriteIds, long compactorTxnId) {
- this(queryState, queryInfo, txnManager);
+ public Driver(QueryState queryState, ValidWriteIdList compactionWriteIds,
long compactorTxnId) {
+ this(queryState);
driverContext.setCompactionWriteIds(compactionWriteIds);
driverContext.setCompactorTxnId(compactorTxnId);
}
+ public Driver(QueryState queryState, long analyzeTableWriteId) {
+ this(queryState);
+ driverContext.setAnalyzeTableWriteId(analyzeTableWriteId);
+ }
+
public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager
txnManager) {
driverContext = new DriverContext(queryState, queryInfo, new
HookRunner(queryState.getConf(), CONSOLE),
txnManager);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index b901825154..60f08632d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -70,6 +70,7 @@ public class DriverContext {
private CacheEntry usedCacheEntry;
private ValidWriteIdList compactionWriteIds = null;
private long compactorTxnId = 0;
+ private long analyzeTableWriteId = 0;
private Context backupContext = null;
private boolean retrial = false;
@@ -216,6 +217,14 @@ public class DriverContext {
this.compactorTxnId = compactorTxnId;
}
+ public long getAnalyzeTableWriteId() {
+ return analyzeTableWriteId;
+ }
+
+ public void setAnalyzeTableWriteId(long analyzeTableWriteId) {
+ this.analyzeTableWriteId = analyzeTableWriteId;
+ }
+
public Context getBackupContext() {
return backupContext;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
index 4c925ad9e6..9fdd4ef37e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
@@ -304,7 +304,8 @@ class DriverTxnHandler {
private void allocateWriteIdForAcidAnalyzeTable() throws LockException {
if (driverContext.getPlan().getAcidAnalyzeTable() != null) {
Table table = driverContext.getPlan().getAcidAnalyzeTable().getTable();
- driverContext.getTxnManager().getTableWriteId(table.getDbName(),
table.getTableName());
+ driverContext.getTxnManager().setTableWriteId(
+ table.getDbName(), table.getTableName(),
driverContext.getAnalyzeTableWriteId());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
index 51ccfd91d8..948dff7c38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -46,26 +46,51 @@ public final class DriverUtils {
throw new UnsupportedOperationException("DriverUtils should not be
instantiated!");
}
- public static void runOnDriver(HiveConf conf, String user, SessionState
sessionState,
+ @FunctionalInterface
+ private interface DriverCreator {
+ Driver createDriver(QueryState qs);
+ }
+
+ public static void runOnDriver(HiveConf conf, SessionState sessionState,
String query) throws HiveException {
- runOnDriver(conf, user, sessionState, query, null, -1);
+ runOnDriver(conf, sessionState, query, null, -1);
}
/**
* For Query Based compaction to run the query to generate the compacted
data.
*/
- public static void runOnDriver(HiveConf conf, String user,
+ public static void runOnDriver(HiveConf conf,
SessionState sessionState, String query, ValidWriteIdList writeIds, long
compactorTxnId)
throws HiveException {
- if(writeIds != null && compactorTxnId < 0) {
+ if (writeIds != null && compactorTxnId < 0) {
throw new
IllegalArgumentException(JavaUtils.txnIdToString(compactorTxnId) +
" is not valid. Context: " + query);
}
+ runOnDriverInternal(query, conf, sessionState, (qs) -> new Driver(qs,
writeIds, compactorTxnId));
+ }
+
+ /**
+ * For Statistics gathering after compaction. Using this overload won't
increment the writeid during stats gathering.
+ */
+ public static void runOnDriver(HiveConf conf, SessionState sessionState,
String query, long analyzeTableWriteId)
+ throws HiveException {
+ if (analyzeTableWriteId < 0) {
+ throw new
IllegalArgumentException(JavaUtils.txnIdToString(analyzeTableWriteId) +
+ " is not valid. Context: " + query);
+ }
+ runOnDriverInternal(query, conf, sessionState, (qs) -> new Driver(qs,
analyzeTableWriteId));
+ }
+
+ private static void runOnDriverInternal(String query, HiveConf conf,
SessionState sessionState, DriverCreator creator) throws HiveException {
SessionState.setCurrentSessionState(sessionState);
boolean isOk = false;
try {
- QueryState qs = new
QueryState.Builder().withHiveConf(conf).withGenerateNewQueryId(true).nonIsolated().build();
- Driver driver = new Driver(qs, null, null, writeIds, compactorTxnId);
+ Driver driver = creator.createDriver(
+ new QueryState.Builder()
+ .withHiveConf(conf)
+ .withGenerateNewQueryId(true)
+ .nonIsolated()
+ .build());
try {
try {
driver.run(query);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index 63c3ebb22f..477370b38d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
+import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -53,6 +54,14 @@ public class AlterTableCompactOperation extends
DDLOperation<AlterTableCompactDe
String partitionName = getPartitionName(table);
CompactionResponse resp = compact(table, partitionName);
+ if (!resp.isAccepted()) {
+ String message = Constants.ERROR_MESSAGE_NO_DETAILS_AVAILABLE;
+ if (resp.isSetErrormessage()) {
+ message = resp.getErrormessage();
+ }
+ throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
+ table.getDbName(), table.getTableName(), partitionName == null ? ""
: "(partition=" + partitionName + ")", message);
+ }
if (desc.isBlocking() && resp.isAccepted()) {
waitForCompactionToFinish(resp);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index fd1b508fd9..bce4ba7b92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -946,6 +946,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl
{
return getTableWriteId(dbName, tableName, false);
}
+ @Override
+ public void setTableWriteId(String dbName, String tableName, long writeId)
throws LockException {
+ String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
+ if (writeId > 0) {
+ tableWriteIds.put(fullTableName, writeId);
+ } else {
+ getTableWriteId(dbName, tableName);
+ }
+ }
+
private long getTableWriteId(
String dbName, String tableName, boolean allocateIfNotYet) throws
LockException {
String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index edd7e6dd8e..00ee1a1fec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -79,6 +79,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
public long getCurrentTxnId() {
return 0L;
}
+
+ @Override
+ public void setTableWriteId(String dbName, String tableName, long writeId)
throws LockException {
+
+ }
+
@Override
public int getStmtIdAndIncrement() {
return 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 88b00f7f45..24deac5928 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -302,7 +302,15 @@ public interface HiveTxnManager {
*/
long getCurrentTxnId();
- /**
+ /**
+ * if {@code writeId > 0}, sets it in the tableWriteId cache, otherwise,
calls {@link #getTableWriteId(String, String)}.
+ * @param dbName
+ * @param tableName
+ * @throws LockException
+ */
+ void setTableWriteId(String dbName, String tableName, long writeId) throws
LockException;
+
+ /**
* if {@code isTxnOpen()}, returns the table write ID associated with
current active transaction.
*/
long getTableWriteId(String dbName, String tableName) throws LockException;
@@ -316,7 +324,7 @@ public interface HiveTxnManager {
* @return 0 if not yet allocated
* @throws LockException
*/
- public long getAllocatedTableWriteId(String dbName, String tableName) throws
LockException;
+ long getAllocatedTableWriteId(String dbName, String tableName) throws
LockException;
/**
* Allocates write id for each transaction in the list.
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index d69383e3b3..b91c726b1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -629,7 +629,7 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
}
cmd = req.buildCommand();
LOG.debug("Running {} based on {}", cmd, req);
- DriverUtils.runOnDriver(conf, user, ss, cmd);
+ DriverUtils.runOnDriver(conf, ss, cmd);
} catch (Exception e) {
LOG.error("Analyze command failed: " + cmd, e);
try {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 263aaf893b..81a80745e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -110,7 +110,7 @@ abstract class QueryCompactor {
try {
LOG.info("Running {} compaction query into temp table with query:
{}",
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
- DriverUtils.runOnDriver(conf, user, sessionState, query);
+ DriverUtils.runOnDriver(conf, sessionState, query);
} catch (Exception ex) {
Throwable cause = ex;
while (cause != null && !(cause instanceof AlreadyExistsException)) {
@@ -135,7 +135,7 @@ abstract class QueryCompactor {
conf.set("hive.optimize.bucketingsorting", "false");
conf.set("hive.vectorized.execution.enabled", "false");
}
- DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds,
compactorTxnId);
+ DriverUtils.runOnDriver(conf, sessionState, query, writeIds,
compactorTxnId);
}
commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf,
writeIds, compactorTxnId);
} catch (HiveException e) {
@@ -147,7 +147,7 @@ abstract class QueryCompactor {
for (String query : dropQueries) {
LOG.info("Running {} compaction query into temp table with query:
{}",
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
- DriverUtils.runOnDriver(conf, user, sessionState, query);
+ DriverUtils.runOnDriver(conf, sessionState, query);
}
} catch (HiveException e) {
LOG.error("Unable to drop temp table {} which was created for running
{} compaction", tmpTableName,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 7d6ad1bc55..69bff17d0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -197,7 +197,7 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
statusUpdaterConf.set(TezConfiguration.TEZ_QUEUE_NAME,
compactionQueueName);
}
SessionState sessionState =
DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
- DriverUtils.runOnDriver(statusUpdaterConf, userName, sessionState,
sb.toString());
+ DriverUtils.runOnDriver(statusUpdaterConf, sessionState,
sb.toString(), ci.highestWriteId);
} catch (Throwable t) {
LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + ","
+ ci.partName +
") failed due to: " + t.getMessage(), t);
diff --git
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 13da8193fb..969f0a504d 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1342,7 +1342,9 @@ public class TestTxnHandler {
rqst.setType(CompactionType.MINOR);
resp = txnHandler.compact(rqst);
- Assert.assertEquals(resp, new CompactionResponse(1,
TxnStore.INITIATED_RESPONSE, false));
+ Assert.assertFalse(resp.isAccepted());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, resp.getState());
+ Assert.assertEquals("Compaction is already scheduled with
state='initiated' and id=1", resp.getErrormessage());
rsp = txnHandler.showCompact(new ShowCompactRequest());
compacts = rsp.getCompacts();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 03703470b5..be717e303a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -520,6 +520,7 @@ public class TestTxnCommands extends
TxnCommandsBaseForTests {
@Test
public void testDDLsAdvancingWriteIds() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK,
true);
String tableName = "alter_table";
runStatementOnDriver("drop table if exists " + tableName);
@@ -558,6 +559,11 @@ public class TestTxnCommands extends
TxnCommandsBaseForTests {
validWriteIds = msClient.getValidWriteIds("default." +
tableName).toString();
Assert.assertEquals("default.alter_table:6:9223372036854775807::",
validWriteIds);
+ //Process the compaction request because otherwise the CONCATENATE (major
compaction) command on the same table and
+ // partition would be refused.
+ runWorker(hiveConf);
+ runCleaner(hiveConf);
+
runStatementOnDriver(String.format("ALTER TABLE %s PARTITION
(ds='2013-04-05') CONCATENATE", tableName));
validWriteIds = msClient.getValidWriteIds("default." +
tableName).toString();
Assert.assertEquals("default.alter_table:7:9223372036854775807::",
validWriteIds);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a7eb735c13..e5830be9ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -731,11 +732,11 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] buckets = fs.listStatus(status[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_10000002_10000002_0000",
status[i].getPath().getName());
+ Assert.assertEquals("delta_10000001_10000001_0000",
status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_10000003_10000003_0000",
status[i].getPath().getName());
+ Assert.assertEquals("delta_10000002_10000002_0000",
status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00000_0",
buckets[0].getPath().getName());
}
@@ -744,7 +745,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] buckets = fs.listStatus(status[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_10000002_10000002_0000",
status[i].getPath().getName());
+ Assert.assertEquals("delete_delta_10000001_10000001_0000",
status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
}
@@ -791,7 +792,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir
has two bucket files
- Assert.assertEquals("base_10000003_v0000031",
status[i].getPath().getName());
+ Assert.assertEquals("base_10000002_v0000031",
status[i].getPath().getName());
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
@@ -818,7 +819,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
status = fs.listStatus(new Path(getWarehouseDir() + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_10000003_v0000031",
status[0].getPath().getName());
+ Assert.assertEquals("base_10000002_v0000031",
status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
@@ -922,11 +923,11 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_10000002_10000002_0000",
parent.getName());
+ Assert.assertEquals("delta_10000001_10000001_0000",
parent.getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_10000003_10000003_0000",
parent.getName());
+ Assert.assertEquals("delta_10000002_10000002_0000",
parent.getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00000_0",
buckets[0].getPath().getName());
}
@@ -935,7 +936,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
FileStatus[] buckets = fs.listStatus(parent,
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_10000002_10000002_0000",
parent.getName());
+ Assert.assertEquals("delete_delta_10000001_10000001_0000",
parent.getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001_0",
buckets[0].getPath().getName());
}
@@ -982,7 +983,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir
has two bucket files
- Assert.assertEquals("base_10000003_v0000031", parent.getName());
+ Assert.assertEquals("base_10000002_v0000031", parent.getName());
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
@@ -1008,7 +1009,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
// Original bucket files, delta directories and previous base directory
should have been cleaned up. Only one base with 2 files.
status = listFilesByTable(fs, Table.NONACIDNESTEDPART);
Assert.assertEquals(2, status.length);
- Assert.assertEquals("base_10000003_v0000031",
status[0].getPath().getParent().getName());
+ Assert.assertEquals("base_10000002_v0000031",
status[0].getPath().getParent().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath().getParent(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
@@ -2061,6 +2062,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'");
runWorker(hiveConf);
+ runCleaner(hiveConf);
r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by
a,b");
Assert.assertEquals(stringifyValues(rExpected), r);
@@ -2403,7 +2405,7 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from
TXN_TO_WRITE_ID"),
- 4, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from
TXN_TO_WRITE_ID"));
+ 3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from
TXN_TO_WRITE_ID"));
// Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID.
txnMgr.commitTxn();
@@ -2532,11 +2534,11 @@ public class TestTxnCommands2 extends
TxnCommandsBaseForTests {
verifyDeltaDirAndResult(4, Table.MMTBL.toString(), "", resultData3);
verifyBaseDir(1, Table.MMTBL.toString(), "");
- // 7. Run one more Major compaction this should not have any affect
- runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- verifyDeltaDirAndResult(4, Table.MMTBL.toString(), "", resultData3);
- verifyBaseDir(1, Table.MMTBL.toString(), "");
+ // 7. Run one more Major compaction this should have been refused because
there are no changes in the table
+ CompactionResponse resp = txnHandler.compact(new
CompactionRequest("default", Table.MMTBL.name.toLowerCase(),
CompactionType.MAJOR));
+ Assert.assertFalse(resp.isAccepted());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, resp.getState());
+ Assert.assertEquals("Compaction is already scheduled with state='ready for
cleaning' and id=2", resp.getErrormessage());
runCleaner(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index c05002430f..caf2994cbe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -235,8 +235,8 @@ public class TestTxnLoadData extends
TxnCommandsBaseForTests {
runStatementOnDriver("export table Tstage to '" + getWarehouseDir()
+"/2'");
runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data'
overwrite into table T");
String[][] expected3 = new String[][] {
- {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_0000005/000000_0"},
- {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
"t/base_0000005/000000_0"}};
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_0000004/000000_0"},
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
"t/base_0000004/000000_0"}};
checkResult(expected3, testQuery, isVectorized, "load data inpath
overwrite");
//one more major compaction
@@ -244,9 +244,9 @@ public class TestTxnLoadData extends
TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected4 = new String[][] {
- {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_0000006_v0000040/bucket_00000"},
- {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
"t/base_0000006_v0000040/bucket_00000"},
- {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t6\t6",
"t/base_0000006_v0000040/bucket_00000"}};
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
"t/base_0000005_v0000040/bucket_00000"},
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
"t/base_0000005_v0000040/bucket_00000"},
+ {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6",
"t/base_0000005_v0000040/bucket_00000"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath
overwrite (major)");
}
/**
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index 452251e44b..498d841eab 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -776,7 +776,7 @@ public class TestStatsUpdaterThread {
}
private void executeQuery(String query) throws HiveException {
- DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query);
+ DriverUtils.runOnDriver(hiveConf, ss, query);
}
private StatsUpdaterThread createUpdater() throws MetaException {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 8b31f068b1..90451553bb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
@@ -984,14 +985,17 @@ public class TestCleaner extends CompactorTest {
CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
addBaseFile(t, null, 22L, 22);
compactInTxn(rqst);
- compactInTxn(rqst);
+
+ CompactionResponse response = txnHandler.compact(rqst);
+
+ Assert.assertFalse(response.isAccepted());
+ Assert.assertEquals("Compaction is already scheduled with state='ready for
cleaning' and id=1", response.getErrormessage());
startCleaner();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
List<Path> paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
@@ -1012,15 +1016,16 @@ public class TestCleaner extends CompactorTest {
CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
compactInTxn(rqst);
- compactInTxn(rqst);
+ CompactionResponse response = txnHandler.compact(rqst);
+
+ Assert.assertFalse(response.isAccepted());
+ Assert.assertEquals("Compaction is already scheduled with state='ready for
cleaning' and id=1", response.getErrormessage());
- startCleaner();
startCleaner();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
List<Path> paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
@@ -1041,15 +1046,16 @@ public class TestCleaner extends CompactorTest {
CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
compactInTxn(rqst);
- compactInTxn(rqst);
+ CompactionResponse response = txnHandler.compact(rqst);
+
+ Assert.assertFalse(response.isAccepted());
+ Assert.assertEquals("Compaction is already scheduled with state='ready for
cleaning' and id=1", response.getErrormessage());
- startCleaner();
startCleaner();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
List<Path> paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 45a595ba53..e481f56698 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -402,6 +402,8 @@ public class TestCompactionMetrics extends CompactorTest {
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER,
WORKER_CYCLE_KEY + "_" +
CompactionType.MINOR.toString().toLowerCase(), 1);
+ startCleaner();
+
rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR);
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
@@ -410,7 +412,16 @@ public class TestCompactionMetrics extends CompactorTest {
rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(2, rsp.getCompactsSize());
- Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts()
+ .stream()
+ .filter(c -> c.getType().equals(CompactionType.MINOR))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Could not found minor
compaction")).getState());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts()
+ .stream()
+ .filter(c -> c.getType().equals(CompactionType.MAJOR))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Could not found minor
compaction")).getState());
json = metrics.dumpJson();
MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.TIMER,
diff --git a/ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q
b/ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q
index a370fa18e9..8d9e558202 100644
--- a/ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q
+++ b/ql/src/test/queries/clientpositive/acid_insert_overwrite_update.q
@@ -26,7 +26,6 @@ insert overwrite table sequential_update
values(current_timestamp, 0, current_ti
delete from sequential_update where seq=2;
select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'),
regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update;
-alter table sequential_update compact 'major';
select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'),
regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update;
-- Check with load
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_compact1.q
b/ql/src/test/queries/clientpositive/dbtxnmgr_compact1.q
index b4dc10d13a..864343d99f 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_compact1.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_compact1.q
@@ -3,8 +3,12 @@ set
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
create table T1_n153(key string, val string) clustered by (val) into 2 buckets
stored as ORC TBLPROPERTIES ('transactional'='true');
+create table T2_n153(key string, val string) clustered by (val) into 2 buckets
stored as ORC TBLPROPERTIES ('transactional'='true');
+
alter table T1_n153 compact 'major';
-alter table T1_n153 compact 'minor';
+alter table T2_n153 compact 'minor';
drop table T1_n153;
+
+drop table T2_n153;
diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_compact3.q
b/ql/src/test/queries/clientpositive/dbtxnmgr_compact3.q
index c9e0a802da..37ea620e44 100644
--- a/ql/src/test/queries/clientpositive/dbtxnmgr_compact3.q
+++ b/ql/src/test/queries/clientpositive/dbtxnmgr_compact3.q
@@ -7,8 +7,12 @@ use D1;
create table T1_n71(key string, val string) clustered by (val) into 2 buckets
stored as ORC TBLPROPERTIES ('transactional'='true');
+create table T2_n71(key string, val string) clustered by (val) into 2 buckets
stored as ORC TBLPROPERTIES ('transactional'='true');
+
alter table T1_n71 compact 'major';
-alter table T1_n71 compact 'minor';
+alter table T2_n71 compact 'minor';
drop table T1_n71;
+
+drop table T2_n71;
\ No newline at end of file
diff --git
a/ql/src/test/results/clientpositive/llap/acid_insert_overwrite_update.q.out
b/ql/src/test/results/clientpositive/llap/acid_insert_overwrite_update.q.out
index 44e614c028..cbdd6c6cb1 100644
--- a/ql/src/test/results/clientpositive/llap/acid_insert_overwrite_update.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_insert_overwrite_update.q.out
@@ -122,10 +122,6 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@sequential_update
#### A masked pattern was here ####
LOOKS OKAY base_0000005
-PREHOOK: query: alter table sequential_update compact 'major'
-PREHOOK: type: ALTERTABLE_COMPACT
-POSTHOOK: query: alter table sequential_update compact 'major'
-POSTHOOK: type: ALTERTABLE_COMPACT
PREHOOK: query: select distinct IF(seq==0, 'LOOKS OKAY', 'BROKEN'),
regexp_extract(INPUT__FILE__NAME, '.*/(.*)/[^/]*', 1) from sequential_update
PREHOOK: type: QUERY
PREHOOK: Input: default@sequential_update
diff --git a/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact1.q.out
b/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact1.q.out
index cebcb15981..a234abae51 100644
--- a/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact1.q.out
+++ b/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact1.q.out
@@ -6,13 +6,21 @@ POSTHOOK: query: create table T1_n153(key string, val string)
clustered by (val)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@T1_n153
+PREHOOK: query: create table T2_n153(key string, val string) clustered by
(val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@T2_n153
+POSTHOOK: query: create table T2_n153(key string, val string) clustered by
(val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@T2_n153
PREHOOK: query: alter table T1_n153 compact 'major'
PREHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: query: alter table T1_n153 compact 'major'
POSTHOOK: type: ALTERTABLE_COMPACT
-PREHOOK: query: alter table T1_n153 compact 'minor'
+PREHOOK: query: alter table T2_n153 compact 'minor'
PREHOOK: type: ALTERTABLE_COMPACT
-POSTHOOK: query: alter table T1_n153 compact 'minor'
+POSTHOOK: query: alter table T2_n153 compact 'minor'
POSTHOOK: type: ALTERTABLE_COMPACT
PREHOOK: query: drop table T1_n153
PREHOOK: type: DROPTABLE
@@ -22,3 +30,11 @@ POSTHOOK: query: drop table T1_n153
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@t1_n153
POSTHOOK: Output: default@t1_n153
+PREHOOK: query: drop table T2_n153
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t2_n153
+PREHOOK: Output: default@t2_n153
+POSTHOOK: query: drop table T2_n153
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t2_n153
+POSTHOOK: Output: default@t2_n153
diff --git a/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact3.q.out
b/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact3.q.out
index 707548562f..bdbbaae963 100644
--- a/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact3.q.out
+++ b/ql/src/test/results/clientpositive/llap/dbtxnmgr_compact3.q.out
@@ -18,13 +18,21 @@ POSTHOOK: query: create table T1_n71(key string, val
string) clustered by (val)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: D1@T1_n71
POSTHOOK: Output: database:d1
+PREHOOK: query: create table T2_n71(key string, val string) clustered by (val)
into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: D1@T2_n71
+PREHOOK: Output: database:d1
+POSTHOOK: query: create table T2_n71(key string, val string) clustered by
(val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: D1@T2_n71
+POSTHOOK: Output: database:d1
PREHOOK: query: alter table T1_n71 compact 'major'
PREHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: query: alter table T1_n71 compact 'major'
POSTHOOK: type: ALTERTABLE_COMPACT
-PREHOOK: query: alter table T1_n71 compact 'minor'
+PREHOOK: query: alter table T2_n71 compact 'minor'
PREHOOK: type: ALTERTABLE_COMPACT
-POSTHOOK: query: alter table T1_n71 compact 'minor'
+POSTHOOK: query: alter table T2_n71 compact 'minor'
POSTHOOK: type: ALTERTABLE_COMPACT
PREHOOK: query: drop table T1_n71
PREHOOK: type: DROPTABLE
@@ -34,3 +42,11 @@ POSTHOOK: query: drop table T1_n71
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: d1@t1_n71
POSTHOOK: Output: d1@t1_n71
+PREHOOK: query: drop table T2_n71
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: d1@t2_n71
+PREHOOK: Output: d1@t2_n71
+POSTHOOK: query: drop table T2_n71
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: d1@t2_n71
+POSTHOOK: Output: d1@t2_n71
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index fe76dba19e..21e5c89883 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -27879,6 +27879,11 @@ void CompactionResponse::__set_state(const
std::string& val) {
void CompactionResponse::__set_accepted(const bool val) {
this->accepted = val;
}
+
+void CompactionResponse::__set_errormessage(const std::string& val) {
+ this->errormessage = val;
+__isset.errormessage = true;
+}
std::ostream& operator<<(std::ostream& out, const CompactionResponse& obj)
{
obj.printTo(out);
@@ -27934,6 +27939,14 @@ uint32_t
CompactionResponse::read(::apache::thrift::protocol::TProtocol* iprot)
xfer += iprot->skip(ftype);
}
break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->errormessage);
+ this->__isset.errormessage = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -27969,6 +27982,11 @@ uint32_t
CompactionResponse::write(::apache::thrift::protocol::TProtocol* oprot)
xfer += oprot->writeBool(this->accepted);
xfer += oprot->writeFieldEnd();
+ if (this->__isset.errormessage) {
+ xfer += oprot->writeFieldBegin("errormessage",
::apache::thrift::protocol::T_STRING, 4);
+ xfer += oprot->writeString(this->errormessage);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -27979,17 +27997,23 @@ void swap(CompactionResponse &a, CompactionResponse
&b) {
swap(a.id, b.id);
swap(a.state, b.state);
swap(a.accepted, b.accepted);
+ swap(a.errormessage, b.errormessage);
+ swap(a.__isset, b.__isset);
}
CompactionResponse::CompactionResponse(const CompactionResponse& other993) {
id = other993.id;
state = other993.state;
accepted = other993.accepted;
+ errormessage = other993.errormessage;
+ __isset = other993.__isset;
}
CompactionResponse& CompactionResponse::operator=(const CompactionResponse&
other994) {
id = other994.id;
state = other994.state;
accepted = other994.accepted;
+ errormessage = other994.errormessage;
+ __isset = other994.__isset;
return *this;
}
void CompactionResponse::printTo(std::ostream& out) const {
@@ -27998,6 +28022,7 @@ void CompactionResponse::printTo(std::ostream& out)
const {
out << "id=" << to_string(id);
out << ", " << "state=" << to_string(state);
out << ", " << "accepted=" << to_string(accepted);
+ out << ", " << "errormessage="; (__isset.errormessage ? (out <<
to_string(errormessage)) : (out << "<null>"));
out << ")";
}
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 63acb5b8b3..d731e6b097 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -10561,19 +10561,26 @@ void swap(CompactionMetricsDataRequest &a,
CompactionMetricsDataRequest &b);
std::ostream& operator<<(std::ostream& out, const
CompactionMetricsDataRequest& obj);
+typedef struct _CompactionResponse__isset {
+ _CompactionResponse__isset() : errormessage(false) {}
+ bool errormessage :1;
+} _CompactionResponse__isset;
class CompactionResponse : public virtual ::apache::thrift::TBase {
public:
CompactionResponse(const CompactionResponse&);
CompactionResponse& operator=(const CompactionResponse&);
- CompactionResponse() : id(0), state(), accepted(0) {
+ CompactionResponse() : id(0), state(), accepted(0), errormessage() {
}
virtual ~CompactionResponse() noexcept;
int64_t id;
std::string state;
bool accepted;
+ std::string errormessage;
+
+ _CompactionResponse__isset __isset;
void __set_id(const int64_t val);
@@ -10581,6 +10588,8 @@ class CompactionResponse : public virtual
::apache::thrift::TBase {
void __set_accepted(const bool val);
+ void __set_errormessage(const std::string& val);
+
bool operator == (const CompactionResponse & rhs) const
{
if (!(id == rhs.id))
@@ -10589,6 +10598,10 @@ class CompactionResponse : public virtual
::apache::thrift::TBase {
return false;
if (!(accepted == rhs.accepted))
return false;
+ if (__isset.errormessage != rhs.__isset.errormessage)
+ return false;
+ else if (__isset.errormessage && !(errormessage == rhs.errormessage))
+ return false;
return true;
}
bool operator != (const CompactionResponse &rhs) const {
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionResponse.java
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionResponse.java
index bf0e58d414..ecbbbb691a 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionResponse.java
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionResponse.java
@@ -14,6 +14,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.protocol.TField ID_FIELD_DESC = new
org.apache.thrift.protocol.TField("id", org.apache.thrift.protocol.TType.I64,
(short)1);
private static final org.apache.thrift.protocol.TField STATE_FIELD_DESC =
new org.apache.thrift.protocol.TField("state",
org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField ACCEPTED_FIELD_DESC =
new org.apache.thrift.protocol.TField("accepted",
org.apache.thrift.protocol.TType.BOOL, (short)3);
+ private static final org.apache.thrift.protocol.TField
ERRORMESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errormessage",
org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new CompactionResponseStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new CompactionResponseTupleSchemeFactory();
@@ -21,12 +22,14 @@ package org.apache.hadoop.hive.metastore.api;
private long id; // required
private @org.apache.thrift.annotation.Nullable java.lang.String state; //
required
private boolean accepted; // required
+ private @org.apache.thrift.annotation.Nullable java.lang.String
errormessage; // optional
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
ID((short)1, "id"),
STATE((short)2, "state"),
- ACCEPTED((short)3, "accepted");
+ ACCEPTED((short)3, "accepted"),
+ ERRORMESSAGE((short)4, "errormessage");
private static final java.util.Map<java.lang.String, _Fields> byName = new
java.util.HashMap<java.lang.String, _Fields>();
@@ -48,6 +51,8 @@ package org.apache.hadoop.hive.metastore.api;
return STATE;
case 3: // ACCEPTED
return ACCEPTED;
+ case 4: // ERRORMESSAGE
+ return ERRORMESSAGE;
default:
return null;
}
@@ -92,6 +97,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final int __ID_ISSET_ID = 0;
private static final int __ACCEPTED_ISSET_ID = 1;
private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.ERRORMESSAGE};
public static final java.util.Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
new java.util.EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -101,6 +107,8 @@ package org.apache.hadoop.hive.metastore.api;
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.ACCEPTED, new
org.apache.thrift.meta_data.FieldMetaData("accepted",
org.apache.thrift.TFieldRequirementType.REQUIRED,
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+ tmpMap.put(_Fields.ERRORMESSAGE, new
org.apache.thrift.meta_data.FieldMetaData("errormessage",
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionResponse.class,
metaDataMap);
}
@@ -131,6 +139,9 @@ package org.apache.hadoop.hive.metastore.api;
this.state = other.state;
}
this.accepted = other.accepted;
+ if (other.isSetErrormessage()) {
+ this.errormessage = other.errormessage;
+ }
}
public CompactionResponse deepCopy() {
@@ -144,6 +155,7 @@ package org.apache.hadoop.hive.metastore.api;
this.state = null;
setAcceptedIsSet(false);
this.accepted = false;
+ this.errormessage = null;
}
public long getId() {
@@ -214,6 +226,30 @@ package org.apache.hadoop.hive.metastore.api;
__isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ACCEPTED_ISSET_ID,
value);
}
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.String getErrormessage() {
+ return this.errormessage;
+ }
+
+ public void setErrormessage(@org.apache.thrift.annotation.Nullable
java.lang.String errormessage) {
+ this.errormessage = errormessage;
+ }
+
+ public void unsetErrormessage() {
+ this.errormessage = null;
+ }
+
+ /** Returns true if field errormessage is set (has been assigned a value)
and false otherwise */
+ public boolean isSetErrormessage() {
+ return this.errormessage != null;
+ }
+
+ public void setErrormessageIsSet(boolean value) {
+ if (!value) {
+ this.errormessage = null;
+ }
+ }
+
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case ID:
@@ -240,6 +276,14 @@ package org.apache.hadoop.hive.metastore.api;
}
break;
+ case ERRORMESSAGE:
+ if (value == null) {
+ unsetErrormessage();
+ } else {
+ setErrormessage((java.lang.String)value);
+ }
+ break;
+
}
}
@@ -255,6 +299,9 @@ package org.apache.hadoop.hive.metastore.api;
case ACCEPTED:
return isAccepted();
+ case ERRORMESSAGE:
+ return getErrormessage();
+
}
throw new java.lang.IllegalStateException();
}
@@ -272,6 +319,8 @@ package org.apache.hadoop.hive.metastore.api;
return isSetState();
case ACCEPTED:
return isSetAccepted();
+ case ERRORMESSAGE:
+ return isSetErrormessage();
}
throw new java.lang.IllegalStateException();
}
@@ -316,6 +365,15 @@ package org.apache.hadoop.hive.metastore.api;
return false;
}
+ boolean this_present_errormessage = true && this.isSetErrormessage();
+ boolean that_present_errormessage = true && that.isSetErrormessage();
+ if (this_present_errormessage || that_present_errormessage) {
+ if (!(this_present_errormessage && that_present_errormessage))
+ return false;
+ if (!this.errormessage.equals(that.errormessage))
+ return false;
+ }
+
return true;
}
@@ -331,6 +389,10 @@ package org.apache.hadoop.hive.metastore.api;
hashCode = hashCode * 8191 + ((accepted) ? 131071 : 524287);
+ hashCode = hashCode * 8191 + ((isSetErrormessage()) ? 131071 : 524287);
+ if (isSetErrormessage())
+ hashCode = hashCode * 8191 + errormessage.hashCode();
+
return hashCode;
}
@@ -372,6 +434,16 @@ package org.apache.hadoop.hive.metastore.api;
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetErrormessage(),
other.isSetErrormessage());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetErrormessage()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.errormessage, other.errormessage);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -408,6 +480,16 @@ package org.apache.hadoop.hive.metastore.api;
sb.append("accepted:");
sb.append(this.accepted);
first = false;
+ if (isSetErrormessage()) {
+ if (!first) sb.append(", ");
+ sb.append("errormessage:");
+ if (this.errormessage == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.errormessage);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -489,6 +571,14 @@ package org.apache.hadoop.hive.metastore.api;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 4: // ERRORMESSAGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.errormessage = iprot.readString();
+ struct.setErrormessageIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -513,6 +603,13 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeFieldBegin(ACCEPTED_FIELD_DESC);
oprot.writeBool(struct.accepted);
oprot.writeFieldEnd();
+ if (struct.errormessage != null) {
+ if (struct.isSetErrormessage()) {
+ oprot.writeFieldBegin(ERRORMESSAGE_FIELD_DESC);
+ oprot.writeString(struct.errormessage);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -533,6 +630,14 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeI64(struct.id);
oprot.writeString(struct.state);
oprot.writeBool(struct.accepted);
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetErrormessage()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetErrormessage()) {
+ oprot.writeString(struct.errormessage);
+ }
}
@Override
@@ -544,6 +649,11 @@ package org.apache.hadoop.hive.metastore.api;
struct.setStateIsSet(true);
struct.accepted = iprot.readBool();
struct.setAcceptedIsSet(true);
+ java.util.BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.errormessage = iprot.readString();
+ struct.setErrormessageIsSet(true);
+ }
}
}
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionResponse.php
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionResponse.php
index 755bb4699e..a448808fd9 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionResponse.php
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionResponse.php
@@ -36,6 +36,11 @@ class CompactionResponse
'isRequired' => true,
'type' => TType::BOOL,
),
+ 4 => array(
+ 'var' => 'errormessage',
+ 'isRequired' => false,
+ 'type' => TType::STRING,
+ ),
);
/**
@@ -50,6 +55,10 @@ class CompactionResponse
* @var bool
*/
public $accepted = null;
+ /**
+ * @var string
+ */
+ public $errormessage = null;
public function __construct($vals = null)
{
@@ -63,6 +72,9 @@ class CompactionResponse
if (isset($vals['accepted'])) {
$this->accepted = $vals['accepted'];
}
+ if (isset($vals['errormessage'])) {
+ $this->errormessage = $vals['errormessage'];
+ }
}
}
@@ -106,6 +118,13 @@ class CompactionResponse
$xfer += $input->skip($ftype);
}
break;
+ case 4:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->errormessage);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -135,6 +154,11 @@ class CompactionResponse
$xfer += $output->writeBool($this->accepted);
$xfer += $output->writeFieldEnd();
}
+ if ($this->errormessage !== null) {
+ $xfer += $output->writeFieldBegin('errormessage', TType::STRING,
4);
+ $xfer += $output->writeString($this->errormessage);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index af706fc832..180b83cebd 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15962,14 +15962,16 @@ class CompactionResponse(object):
- id
- state
- accepted
+ - errormessage
"""
- def __init__(self, id=None, state=None, accepted=None,):
+ def __init__(self, id=None, state=None, accepted=None, errormessage=None,):
self.id = id
self.state = state
self.accepted = accepted
+ self.errormessage = errormessage
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15995,6 +15997,11 @@ class CompactionResponse(object):
self.accepted = iprot.readBool()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRING:
+ self.errormessage = iprot.readString().decode('utf-8',
errors='replace') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -16017,6 +16024,10 @@ class CompactionResponse(object):
oprot.writeFieldBegin('accepted', TType.BOOL, 3)
oprot.writeBool(self.accepted)
oprot.writeFieldEnd()
+ if self.errormessage is not None:
+ oprot.writeFieldBegin('errormessage', TType.STRING, 4)
+ oprot.writeString(self.errormessage.encode('utf-8') if
sys.version_info[0] == 2 else self.errormessage)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -30964,6 +30975,7 @@ CompactionResponse.thrift_spec = (
(1, TType.I64, 'id', None, None, ), # 1
(2, TType.STRING, 'state', 'UTF8', None, ), # 2
(3, TType.BOOL, 'accepted', None, None, ), # 3
+ (4, TType.STRING, 'errormessage', 'UTF8', None, ), # 4
)
all_structs.append(ShowCompactRequest)
ShowCompactRequest.thrift_spec = (
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 1176606914..d1f7758719 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4629,11 +4629,13 @@ class CompactionResponse
ID = 1
STATE = 2
ACCEPTED = 3
+ ERRORMESSAGE = 4
FIELDS = {
ID => {:type => ::Thrift::Types::I64, :name => 'id'},
STATE => {:type => ::Thrift::Types::STRING, :name => 'state'},
- ACCEPTED => {:type => ::Thrift::Types::BOOL, :name => 'accepted'}
+ ACCEPTED => {:type => ::Thrift::Types::BOOL, :name => 'accepted'},
+ ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name =>
'errormessage', :optional => true}
}
def struct_fields; FIELDS; end
diff --git
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index ded286697f..dac4678886 100644
---
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1334,7 +1334,8 @@ struct CompactionMetricsDataRequest {
struct CompactionResponse {
1: required i64 id,
2: required string state,
- 3: required bool accepted
+ 3: required bool accepted,
+ 4: optional string errormessage
}
struct ShowCompactRequest {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index a94579fced..dabb554eea 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -171,6 +172,7 @@ import static org.apache.commons.lang3.StringUtils.repeat;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
import static
org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatchNoCount;
import static
org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatch;
+import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getFullTableName;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
@@ -3704,12 +3706,21 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
long id = generateCompactionQueueId(stmt);
+ GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
+ Collections.singletonList(getFullTableName(rqst.getDbname(),
rqst.getTablename())));
+ final ValidCompactorWriteIdList tblValidWriteIds =
+
TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
+ LOG.debug("ValidCompactWriteIdList: " +
tblValidWriteIds.writeToString());
+
List<String> params = new ArrayList<>();
StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\"
FROM \"COMPACTION_QUEUE\" WHERE").
- append(" \"CQ_STATE\" IN(").append(quoteChar(INITIATED_STATE)).
- append(",").append(quoteChar(WORKING_STATE)).
- append(") AND \"CQ_DATABASE\"=?").
- append(" AND \"CQ_TABLE\"=?").append(" AND ");
+ append(" (\"CQ_STATE\" IN(").
+
append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
+ append(") OR (\"CQ_STATE\" =
").append(quoteChar(READY_FOR_CLEANING)).
+ append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
+ append(" AND \"CQ_DATABASE\"=?").
+ append(" AND \"CQ_TABLE\"=?").append(" AND ");
+ params.add(Long.toString(tblValidWriteIds.getHighWatermark()));
params.add(rqst.getDbname());
params.add(rqst.getTablename());
if(rqst.getPartitionname() == null) {
@@ -3720,7 +3731,7 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
}
pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(),
params);
- LOG.debug("Going to execute query <" + sb.toString() + ">");
+ LOG.debug("Going to execute query <" + sb + ">");
ResultSet rs = pst.executeQuery();
if(rs.next()) {
long enqueuedId = rs.getLong(1);
@@ -3728,7 +3739,10 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" +
rqst.getTablename() +
"/" + rqst.getPartitionname() + " since it is already " +
quoteString(state) +
" with id=" + enqueuedId);
- return new CompactionResponse(enqueuedId, state, false);
+ CompactionResponse resp = new CompactionResponse(-1,
REFUSED_RESPONSE, false);
+ resp.setErrormessage("Compaction is already scheduled with state=" +
quoteString(state) +
+ " and id=" + enqueuedId);
+ return resp;
}
close(rs);
closeStmt(pst);