This is an automated email from the ASF dual-hosted git repository.
veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 29dc08172ba HIVE-27081: Revert HIVE-26717 and HIVE-26718 (Laszlo Vegh,
reviewed by Laszlo Bodor)
29dc08172ba is described below
commit 29dc08172ba9ae3a3f51320f656994c20198c5f0
Author: veghlaci05 <[email protected]>
AuthorDate: Wed Feb 15 09:05:45 2023 +0100
HIVE-27081: Revert HIVE-26717 and HIVE-26718 (Laszlo Vegh, reviewed by
Laszlo Bodor)
* Revert "HIVE-26717: Query based Rebalance compaction on insert-only
tables (Laszlo Vegh, reviewed by Denys Kuzmenko, Krisztian Kasa)"
This reverts commit f2e908c5
* Revert "HIVE-26718: Enable initiator to schedule rebalancing compactions
(Laszlo Vegh, reviewed by Krisztian Kasa)"
This reverts commit 919e734d3b67902a721afab1ca9a803855dbf2ec.
---
.../hive/ql/txn/compactor/CompactorTestUtil.java | 6 +-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 120 ---------------------
.../hive/ql/txn/compactor/CompactorFactory.java | 12 +--
.../hadoop/hive/ql/txn/compactor/Initiator.java | 51 +--------
.../hadoop/hive/ql/txn/compactor/StatsUpdater.java | 1 +
.../hadoop/hive/ql/txn/compactor/Worker.java | 21 +---
.../hive/ql/txn/compactor/TestInitiator.java | 70 ------------
.../hadoop/hive/metastore/conf/MetastoreConf.java | 9 --
.../hive/metastore/txn/CompactionTxnHandler.java | 18 ++--
9 files changed, 19 insertions(+), 289 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 8dd6087a66a..e157f641c56 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -102,13 +102,9 @@ class CompactorTestUtil {
*/
static List<String> getBucketFileNames(FileSystem fs, Table table, String
partitionName, String deltaName)
throws IOException {
- boolean insertOnly = AcidUtils.isInsertOnlyTable(table.getParameters());
Path path = partitionName == null ? new Path(table.getSd().getLocation(),
deltaName) : new Path(
new Path(table.getSd().getLocation()), new Path(partitionName,
deltaName));
- return Arrays.stream(fs.listStatus(path, insertOnly?
AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter))
- .map(FileStatus::getPath)
- .map(Path::getName)
- .sorted()
+ return Arrays.stream(fs.listStatus(path,
AcidUtils.bucketFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
.collect(Collectors.toList());
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index c4d5fde2e12..dd5686ac84c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -37,14 +37,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
@@ -60,17 +58,11 @@ import
org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
-import org.apache.hadoop.hive.ql.txn.compactor.Compactor;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
@@ -84,7 +76,6 @@ import org.apache.orc.impl.RecordReaderImpl;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import org.mockito.internal.util.reflection.FieldSetter;
import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
@@ -464,117 +455,6 @@ public class TestCrudCompactorOnTez extends
CompactorOnTezTest {
}
}
- @Test
- public void
testMMRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws
Exception {
- conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
- conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
-
- //set grouping size to have 3 buckets, and re-create driver with the new
config
- conf.set("tez.grouping.min-size", "200");
- conf.set("tez.grouping.max-size", "80000");
- driver = new Driver(conf);
-
- final String stageTableName = "stage_rebalance_test";
- final String tableName = "rebalance_test";
-
- TestDataProvider testDataProvider = new TestDataProvider();
- testDataProvider.createFullAcidTable(stageTableName, true, false);
- testDataProvider.insertTestDataPartitioned(stageTableName);
-
- executeStatementOnDriver("drop table if exists " + tableName, driver);
- executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
- "STORED AS ORC TBLPROPERTIES('transactional'='true',
'transactional_properties'='insert_only')", driver);
- executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
-
- //do some single inserts to have more data in the first bucket.
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
- executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
-
- Table table = msClient.getTable("default", tableName);
- FileSystem fs = FileSystem.get(conf);
-
- // Verify buckets and their content before rebalance
- Path basePath = new Path(table.getSd().getLocation(), "base_0000001");
- List<String> bucketFileNames = CompactorTestUtil.getBucketFileNames(fs,
table, null, "base_0000001");
-
- Assert.assertEquals("Test setup does not match the expected: different
buckets",
- Arrays.asList("000000_0", "000001_0", "000002_0", "000003_0",
"000004_0", "000005_0", "000006_0"),
- bucketFileNames);
-
- Map<String, Integer> expectedRowCount = new HashMap<String, Integer>() {{
- put("000000_0", 4);
- put("000001_0", 2);
- put("000002_0", 1);
- put("000003_0", 2);
- put("000004_0", 1);
- put("000005_0", 1);
- put("000006_0", 1);
- }};
- for (String bucketFileName : bucketFileNames) {
- try(Reader reader = OrcFile.createReader(new Path(basePath,
bucketFileName), OrcFile.readerOptions(conf))) {
- Assert.assertEquals("Row count in bucket " + bucketFileName + " does
not match before compaction",
- (int) expectedRowCount.get(bucketFileName), (int)
reader.getNumberOfRows());
- }
- }
-
- //Try to do a rebalancing compaction
- executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
-
- CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance());
- AtomicReference<Compactor> reference = new AtomicReference<>();
- doAnswer(invocation -> {
- CompactorPipeline result =
(CompactorPipeline)invocation.callRealMethod();
- // Use reflection to fetch inner compactors
- CompactorPipeline compactorPipeline = (CompactorPipeline) result;
- Field field = compactorPipeline.getClass().getDeclaredField("compactor");
- field.setAccessible(true);
- Compactor compactor = (Compactor) field.get(compactorPipeline);
- reference.set(compactor);
- return result;
- }).when(factory).getCompactorPipeline(any(), any(), any(), any());
-
-
- Worker worker = new Worker(factory);
- worker.setConf(conf);
- worker.init(new AtomicBoolean(true));
- worker.run();
-
- //Check if MmMajorQueryCompactor was used as fallback.
- Assert.assertEquals("Wrong compactor were chosen.",
MmMajorQueryCompactor.class, reference.get().getClass());
-
- //Check if the compaction succeed
- List<ShowCompactResponseElement> compactions = verifyCompaction(1,
TxnStore.CLEANING_RESPONSE);
- Assert.assertTrue("no senor", compactions.get(0).getErrorMessage()
- .contains("Falling back to MAJOR compaction as REBALANCE compaction is
supported only on full-acid tables."));
-
- // Verify buckets and their content after rebalance
- basePath = new Path(table.getSd().getLocation(), "base_0000007_v0000017");
- bucketFileNames = CompactorTestUtil.getBucketFileNames(fs, table, null,
"base_0000007_v0000017");
-
- Assert.assertEquals("Buckets does not match after compaction",
- Arrays.asList("000000_0", "000001_0", "000002_0", "000003_0",
"000004_0"),
- bucketFileNames);
-
- expectedRowCount = new HashMap<String, Integer>() {{
- put("000000_0", 4);
- put("000001_0", 4);
- put("000002_0", 4);
- put("000003_0", 4);
- put("000004_0", 2);
- }};
- for (String bucketFileName : bucketFileNames) {
- try(Reader reader = OrcFile.createReader(new Path(basePath,
bucketFileName), OrcFile.readerOptions(conf))) {
- Assert.assertEquals("Row count in bucket " + bucketFileName + " does
not match before compaction",
- (int) expectedRowCount.get(bucketFileName), (int)
reader.getNumberOfRows());
- }
- }
- }
-
@Test
public void testCompactionShouldNotFailOnPartitionsWithBooleanField() throws
Exception {
conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
index ba8b2d207f3..786391c48e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
@@ -104,21 +104,13 @@ public final class CompactorFactory {
case MAJOR:
return new CompactorPipeline(new MergeCompactor())
.addCompactor(new MmMajorQueryCompactor());
- case REBALANCE:
- // REBALANCE COMPACTION on an insert-only table is simply a MAJOR
compaction. Since there is no ACID row data,
- // there is no acid row order to keep, and the number of buckets
cannot be set at all (it will be calculated
- // and created by TEZ dynamically). Initiator won't schedule
REBALANCE compactions for insert-only tables,
- // however users can request it. In these cases we simply fall back
to MAJOR compaction
- return new CompactorPipeline(new MmMajorQueryCompactor());
default:
throw new HiveException(
compactionInfo.type.name() + " compaction is not supported on
insert only tables.");
}
- } else {
- throw new HiveException("Only transactional tables can be compacted, " +
table.getTableName() + "is not suitable " +
- "for compaction!");
}
- throw new HiveException("No suitable compactor found for table: " +
table.getTableName());
+ throw new HiveException("Only transactional tables can be compacted, " +
table.getTableName() + "is not suitable " +
+ "for compaction!");
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 471f9fa88f7..ee7727b69ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -460,8 +460,8 @@ public class Initiator extends MetaStoreCompactorThread {
return AcidUtils.getAcidState(fs, location, conf, writeIds,
Ref.from(false), false);
}
- private CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir,
- Map<String, String>
tblproperties, long baseSize, long deltaSize) {
+ private CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir, Map<String,
+ String> tblproperties, long baseSize, long deltaSize) {
boolean noBase = false;
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
if (baseSize == 0 && deltaSize > 0) {
@@ -501,10 +501,6 @@ public class Initiator extends MetaStoreCompactorThread {
if (initiateMajor) return CompactionType.MAJOR;
}
- if (scheduleRebalance(ci, dir, tblproperties, baseSize, deltaSize)) {
- return CompactionType.REBALANCE;
- }
-
String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
int deltaNumThreshold = deltaNumProp == null ?
@@ -524,49 +520,6 @@ public class Initiator extends MetaStoreCompactorThread {
CompactionType.MAJOR : CompactionType.MINOR;
}
- private boolean scheduleRebalance(CompactionInfo ci, AcidDirectory dir,
Map<String, String> tblproperties, long baseSize, long deltaSize) {
- // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
- // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
- // files shouldn't be too high
- if (AcidUtils.isFullAcidTable(tblproperties)) {
- // We check only full-acid tables, for insert-only tables a MAJOR
compaction will also rebalance bucket data.
- long totalSize = baseSize + deltaSize;
- long minimumSize = MetastoreConf.getLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
- if (totalSize > minimumSize) {
- try {
- Map<Integer, Long> bucketSizes = new HashMap<>();
- //compute the size of each bucket
- dir.getFiles().stream()
- .filter(f ->
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
- .forEach(
- f -> bucketSizes.merge(
-
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
- f.getHdfsFileStatusWithId().getFileStatus().getLen(),
- Long::sum));
- final double mean = (double) totalSize / bucketSizes.size();
-
- // calculate the standard deviation
- double standardDeviation = Math.sqrt(
- bucketSizes.values().stream().mapToDouble(Long::doubleValue)
- .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num -
mean, 2)) / bucketSizes.size()));
-
- double rsdThreshold = MetastoreConf.getDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
- //Relative standard deviation: If the standard deviation is larger
than rsdThreshold * average_bucket_size,
- // a rebalancing compaction is initiated.
- if (standardDeviation > mean * rsdThreshold) {
- LOG.debug("Initiating REBALANCE compaction on table {}",
ci.tableName);
- return true;
- }
- } catch (IOException e) {
- LOG.error("Error occured during checking bucket file sizes,
rebalance threshold calculation is skipped.", e);
- }
- } else {
- LOG.debug("Table is smaller than the minimum required size for
REBALANCE compaction.");
- }
- }
- return false;
- }
-
private long getBaseSize(AcidDirectory dir) throws IOException {
long baseSize = 0;
if (dir.getBase() != null) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
index 698346b9b72..50d04f8b1b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index b0bec164fce..b1e690f16d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -185,8 +185,7 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
boolean isEnoughToCompact;
if (ci.isRebalanceCompaction()) {
- //However thresholds are used to schedule REBALANCE compaction, manual
triggering is always allowed if the
- //table and query engine supports it
+ //TODO: For now, we are allowing rebalance compaction regardless of the
table state. Thresholds will be added later.
return true;
} else if (ci.isMajorCompaction()) {
isEnoughToCompact =
@@ -282,7 +281,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
findNextCompactRequest.setWorkerVersion(runtimeVersion);
findNextCompactRequest.setPoolName(this.getPoolName());
ci =
CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(findNextCompactRequest));
- ci.errorMessage = "";
LOG.info("Processing compaction request {}", ci);
if (ci == null) {
@@ -330,19 +328,10 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
return false;
}
- if (LOG.isWarnEnabled()) {
- boolean insertOnly =
AcidUtils.isInsertOnlyTable(table.getParameters());
- if (ci.type.equals(CompactionType.REBALANCE) && insertOnly) {
- ci.errorMessage += "Falling back to MAJOR compaction as REBALANCE
compaction is supported only on full-acid tables. ";
- LOG.warn("REBALANCE compaction requested on an insert-only table
({}). Falling back to MAJOR compaction as " +
- "REBALANCE compaction is supported only on full-acid tables",
table.getTableName());
- }
- if ((!ci.type.equals(CompactionType.REBALANCE) || insertOnly) &&
ci.numberOfBuckets > 0) {
- ci.errorMessage += "Only REBALANCE compaction on a full-acid table
accepts the number of buckets clause. " +
- "(CLUSTERED INTO {N} BUCKETS).";
- LOG.warn("Only REBALANCE compaction on a full-acid table accepts the
number of buckets clause " +
- "(CLUSTERED INTO {N} BUCKETS). Since the compaction request
is {} and the table is {}, it will be ignored.",
- ci.type, insertOnly ? "insert-only" : "full-acid");
+ if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0)
{
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Only the REBALANCE compaction accepts the number of
buckets clause (CLUSTERED INTO {N} BUCKETS). " +
+ "Since the compaction request is {}, it will be ignored.",
ci.type);
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 7f690dbd6fc..6a9a37dfe3f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -72,76 +72,6 @@ public class TestInitiator extends CompactorTest {
startInitiator();
}
- @Test
- public void compactRebalance() throws Exception {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED,
true);
- //Set the tresholds to reach the rebalance compaction threshold without
reaching the major compaction threshold.
- MetastoreConf.setLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
- MetastoreConf.setDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD, 0.02);
-
- prepareRebalanceData();
- startInitiator();
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("initiated", compacts.get(0).getState());
- Assert.assertEquals("rebalance", compacts.get(0).getTablename());
- Assert.assertEquals(CompactionType.REBALANCE, compacts.get(0).getType());
- }
-
- @Test
- public void noCompactRebalanceSmallTable() throws Exception {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED,
true);
-
- prepareRebalanceData();
- startInitiator();
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
- }
-
- @Test
- public void noCompactRebalanceDataBalanced() throws Exception {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED,
true);
- //Set minimum size to let initiator check, but doesn't modify rebalance
threshold. No rebalance compaciton should
- //be initiated.
- MetastoreConf.setLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
-
- prepareRebalanceData();
- startInitiator();
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
- }
-
- private void prepareRebalanceData() throws Exception {
- Table t = newTable("default", "rebalance", false);
-
- addBaseFile(t, null, 200L, 200, 2, true);
- addDeltaFile(t, null, 201L, 220L, 19, 2, false);
-
- burnThroughTransactions("default", "rebalance", 220);
-
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.PARTITION, "default");
- comp.setTablename("rebalance");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- txnHandler.lock(req);
- long writeid = allocateWriteId("default", "rebalance", txnid);
- Assert.assertEquals(221, writeid);
- txnHandler.commitTxn(new CommitTxnRequest(txnid));
- }
-
@Test
public void recoverFailedLocalWorkers() throws Exception {
Table t = newTable("default", "rflw1", false);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 20d30aa80ef..4af26205dcb 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -435,15 +435,6 @@ public class MetastoreConf {
"Time after Initiator will ignore
metastore.compactor.initiator.failed.compacts.threshold "
+ "and retry with compaction again. This will try to auto heal
tables with previous failed compaction "
+ "without manual intervention. Setting it to 0 or negative value
will disable this feature."),
-
COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE("metastore.compactor.initiator.rebalance.min.size",
- "hive.compactor.initiator.rebalance.min.size", 1024*1024*100,
- "Minimum table/partition size for which a rebalancing compaction can
be initiated."),
-
COMPACTOR_INITIATOR_REBALANCE_THRESHOLD("metastore.compactor.initiator.rebalance.threshold",
- "hive.compactor.initiator.rebalance.threshold", 0.2d,
- "Threshold for the rebalancing compaction. If the
std_dev/average_bucket_size (where std_dev is the " +
- "standard deviation of the bucket sizes) is larger than the
threshold, a rebalance compaction is initiated. " +
- "In other words (assuming that the value is 0.2): If the standard
deviation is larger than 20% of the average " +
- "bucket size, a rebalancing compaction is initiated. "),
COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user",
"hive.compactor.run.as.user", "",
"Specify the user to run compactor Initiator and Worker as. If empty
string, defaults to table/partition " +
"directory owner."),
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ea24cb3ca41..f6529a5895c 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -342,17 +342,15 @@ class CompactionTxnHandler extends TxnHandler {
public void markCompacted(CompactionInfo info) throws MetaException {
try {
Connection dbConn = null;
- PreparedStatement pstmt = null;
+ Statement stmt = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- String sql = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = ?, "
- + "\"CQ_WORKER_ID\" = NULL, \"CQ_ERROR_MESSAGE\" = ? WHERE
\"CQ_ID\" = ?";
- pstmt = dbConn.prepareStatement(sql);
- pstmt.setString(1, Character.toString(READY_FOR_CLEANING));
- pstmt.setString(2, StringUtils.isNotBlank(info.errorMessage) ?
info.errorMessage : null);
- pstmt.setLong(3, info.id);
- LOG.debug("Going to execute update <{}>", sql);
- int updCnt = pstmt.executeUpdate();
+ stmt = dbConn.createStatement();
+ String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" +
READY_FOR_CLEANING + "', "
+ + "\"CQ_WORKER_ID\" = NULL"
+ + " WHERE \"CQ_ID\" = " + info.id;
+ LOG.debug("Going to execute update <{}>", s);
+ int updCnt = stmt.executeUpdate(s);
if (updCnt != 1) {
LOG.error("Unable to set cq_state={} for compaction record: {}.
updCnt={}", READY_FOR_CLEANING, info, updCnt);
LOG.debug("Going to rollback");
@@ -368,7 +366,7 @@ class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
e.getMessage());
} finally {
- closeStmt(pstmt);
+ closeStmt(stmt);
closeDbConn(dbConn);
}
} catch (RetryException e) {