This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 29dc08172ba HIVE-27081: Revert HIVE-26717 and HIVE-26718 (Laszlo Vegh, 
reviewed by Laszlo Bodor)
29dc08172ba is described below

commit 29dc08172ba9ae3a3f51320f656994c20198c5f0
Author: veghlaci05 <[email protected]>
AuthorDate: Wed Feb 15 09:05:45 2023 +0100

    HIVE-27081: Revert HIVE-26717 and HIVE-26718 (Laszlo Vegh, reviewed by 
Laszlo Bodor)
    
    * Revert "HIVE-26717: Query based Rebalance compaction on insert-only 
tables (Laszlo Vegh, reviewed by Denys Kuzmenko, Krisztian Kasa)"
    
    This reverts commit f2e908c5
    
    * Revert "HIVE-26718: Enable initiator to schedule rebalancing compactions 
(Laszlo Vegh, reviewed by Krisztian Kasa)"
    
    This reverts commit 919e734d3b67902a721afab1ca9a803855dbf2ec.
---
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |   6 +-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 120 ---------------------
 .../hive/ql/txn/compactor/CompactorFactory.java    |  12 +--
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  51 +--------
 .../hadoop/hive/ql/txn/compactor/StatsUpdater.java |   1 +
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  21 +---
 .../hive/ql/txn/compactor/TestInitiator.java       |  70 ------------
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   9 --
 .../hive/metastore/txn/CompactionTxnHandler.java   |  18 ++--
 9 files changed, 19 insertions(+), 289 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 8dd6087a66a..e157f641c56 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -102,13 +102,9 @@ class CompactorTestUtil {
    */
   static List<String> getBucketFileNames(FileSystem fs, Table table, String 
partitionName, String deltaName)
       throws IOException {
-    boolean insertOnly = AcidUtils.isInsertOnlyTable(table.getParameters());
     Path path = partitionName == null ? new Path(table.getSd().getLocation(), 
deltaName) : new Path(
         new Path(table.getSd().getLocation()), new Path(partitionName, 
deltaName));
-    return Arrays.stream(fs.listStatus(path, insertOnly? 
AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter))
-        .map(FileStatus::getPath)
-        .map(Path::getName)
-        .sorted()
+    return Arrays.stream(fs.listStatus(path, 
AcidUtils.bucketFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
         .collect(Collectors.toList());
   }
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index c4d5fde2e12..dd5686ac84c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -37,14 +37,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
@@ -60,17 +58,11 @@ import 
org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook;
 import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
-import org.apache.hadoop.hive.ql.txn.compactor.Compactor;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.StreamingConnection;
 import org.apache.hive.streaming.StrictDelimitedInputWriter;
@@ -84,7 +76,6 @@ import org.apache.orc.impl.RecordReaderImpl;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.FieldSetter;
 
 import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
@@ -464,117 +455,6 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     }
   }
 
-  @Test
-  public void 
testMMRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws 
Exception {
-    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
-    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
-
-    //set grouping size to have 3 buckets, and re-create driver with the new 
config
-    conf.set("tez.grouping.min-size", "200");
-    conf.set("tez.grouping.max-size", "80000");
-    driver = new Driver(conf);
-
-    final String stageTableName = "stage_rebalance_test";
-    final String tableName = "rebalance_test";
-
-    TestDataProvider testDataProvider = new TestDataProvider();
-    testDataProvider.createFullAcidTable(stageTableName, true, false);
-    testDataProvider.insertTestDataPartitioned(stageTableName);
-
-    executeStatementOnDriver("drop table if exists " + tableName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
-        "STORED AS ORC TBLPROPERTIES('transactional'='true', 
'transactional_properties'='insert_only')", driver);
-    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
-
-    //do some single inserts to have more data in the first bucket.
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
-
-    Table table = msClient.getTable("default", tableName);
-    FileSystem fs = FileSystem.get(conf);
-
-    // Verify buckets and their content before rebalance
-    Path basePath = new Path(table.getSd().getLocation(), "base_0000001");
-    List<String> bucketFileNames = CompactorTestUtil.getBucketFileNames(fs, 
table, null, "base_0000001");
-
-    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
-        Arrays.asList("000000_0", "000001_0", "000002_0", "000003_0", 
"000004_0", "000005_0", "000006_0"),
-        bucketFileNames);
-
-    Map<String, Integer> expectedRowCount = new HashMap<String, Integer>() {{
-      put("000000_0", 4);
-      put("000001_0", 2);
-      put("000002_0", 1);
-      put("000003_0", 2);
-      put("000004_0", 1);
-      put("000005_0", 1);
-      put("000006_0", 1);
-    }};
-    for (String bucketFileName : bucketFileNames) {
-      try(Reader reader = OrcFile.createReader(new Path(basePath, 
bucketFileName), OrcFile.readerOptions(conf))) {
-        Assert.assertEquals("Row count in bucket " + bucketFileName + " does 
not match before compaction",
-            (int) expectedRowCount.get(bucketFileName), (int) 
reader.getNumberOfRows());
-      }
-    }
-
-    //Try to do a rebalancing compaction
-    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
-
-    CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance());
-    AtomicReference<Compactor> reference = new AtomicReference<>();
-    doAnswer(invocation -> {
-      CompactorPipeline result = 
(CompactorPipeline)invocation.callRealMethod();
-      // Use reflection to fetch inner compactors
-      CompactorPipeline compactorPipeline = (CompactorPipeline) result;
-      Field field = compactorPipeline.getClass().getDeclaredField("compactor");
-      field.setAccessible(true);
-      Compactor compactor = (Compactor) field.get(compactorPipeline);
-      reference.set(compactor);
-      return result;
-    }).when(factory).getCompactorPipeline(any(), any(), any(), any());
-
-
-    Worker worker = new Worker(factory);
-    worker.setConf(conf);
-    worker.init(new AtomicBoolean(true));
-    worker.run();
-
-    //Check if MmMajorQueryCompactor was used as fallback.
-    Assert.assertEquals("Wrong compactor were chosen.", 
MmMajorQueryCompactor.class, reference.get().getClass());
-
-    //Check if the compaction succeed
-    List<ShowCompactResponseElement> compactions = verifyCompaction(1, 
TxnStore.CLEANING_RESPONSE);
-    Assert.assertTrue("no senor", compactions.get(0).getErrorMessage()
-        .contains("Falling back to MAJOR compaction as REBALANCE compaction is 
supported only on full-acid tables."));
-
-    // Verify buckets and their content after rebalance
-    basePath = new Path(table.getSd().getLocation(), "base_0000007_v0000017");
-    bucketFileNames = CompactorTestUtil.getBucketFileNames(fs, table, null, 
"base_0000007_v0000017");
-
-    Assert.assertEquals("Buckets does not match after compaction",
-        Arrays.asList("000000_0", "000001_0", "000002_0", "000003_0", 
"000004_0"),
-        bucketFileNames);
-
-    expectedRowCount = new HashMap<String, Integer>() {{
-      put("000000_0", 4);
-      put("000001_0", 4);
-      put("000002_0", 4);
-      put("000003_0", 4);
-      put("000004_0", 2);
-    }};
-    for (String bucketFileName : bucketFileNames) {
-      try(Reader reader = OrcFile.createReader(new Path(basePath, 
bucketFileName), OrcFile.readerOptions(conf))) {
-        Assert.assertEquals("Row count in bucket " + bucketFileName + " does 
not match before compaction",
-            (int) expectedRowCount.get(bucketFileName), (int) 
reader.getNumberOfRows());
-      }
-    }
-  }
-
   @Test
   public void testCompactionShouldNotFailOnPartitionsWithBooleanField() throws 
Exception {
     conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
index ba8b2d207f3..786391c48e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorFactory.java
@@ -104,21 +104,13 @@ public final class CompactorFactory {
         case MAJOR:
           return new CompactorPipeline(new MergeCompactor())
                   .addCompactor(new MmMajorQueryCompactor());
-        case REBALANCE:
-          // REBALANCE COMPACTION on an insert-only table is simply a MAJOR 
compaction. Since there is no ACID row data,
-          // there is no acid row order to keep, and the number of buckets 
cannot be set at all (it will be calculated
-          // and created by TEZ dynamically). Initiator won't schedule 
REBALANCE compactions for insert-only tables,
-          // however users can request it. In these cases we simply fall back 
to MAJOR compaction
-          return new CompactorPipeline(new MmMajorQueryCompactor());
         default:
           throw new HiveException(
               compactionInfo.type.name() + " compaction is not supported on 
insert only tables.");
       }
-    } else {
-      throw new HiveException("Only transactional tables can be compacted, " + 
table.getTableName() + "is not suitable " +
-          "for compaction!");
     }
-    throw new HiveException("No suitable compactor found for table: " + 
table.getTableName());
+    throw new HiveException("Only transactional tables can be compacted, " + 
table.getTableName() + "is not suitable " +
+        "for compaction!");
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 471f9fa88f7..ee7727b69ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -460,8 +460,8 @@ public class Initiator extends MetaStoreCompactorThread {
     return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
   }
 
-  private CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
-                                                 Map<String, String> 
tblproperties, long baseSize, long deltaSize) {
+  private CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir, Map<String,
+      String> tblproperties, long baseSize, long deltaSize) {
     boolean noBase = false;
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     if (baseSize == 0 && deltaSize > 0) {
@@ -501,10 +501,6 @@ public class Initiator extends MetaStoreCompactorThread {
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
-    if (scheduleRebalance(ci, dir, tblproperties, baseSize, deltaSize)) {
-      return CompactionType.REBALANCE;
-    }
-
     String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
         HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
     int deltaNumThreshold = deltaNumProp == null ?
@@ -524,49 +520,6 @@ public class Initiator extends MetaStoreCompactorThread {
             CompactionType.MAJOR : CompactionType.MINOR;
   }
 
-  private boolean scheduleRebalance(CompactionInfo ci, AcidDirectory dir, 
Map<String, String> tblproperties, long baseSize, long deltaSize) {
-    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
-    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
-    // files shouldn't be too high
-    if (AcidUtils.isFullAcidTable(tblproperties)) {
-      // We check only full-acid tables, for insert-only tables a MAJOR 
compaction will also rebalance bucket data.
-      long totalSize = baseSize + deltaSize;
-      long minimumSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
-      if (totalSize > minimumSize) {
-        try {
-          Map<Integer, Long> bucketSizes = new HashMap<>();
-          //compute the size of each bucket
-          dir.getFiles().stream()
-              .filter(f -> 
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
-              .forEach(
-                  f -> bucketSizes.merge(
-                      
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
-                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
-                      Long::sum));
-          final double mean = (double) totalSize / bucketSizes.size();
-
-          // calculate the standard deviation
-          double standardDeviation = Math.sqrt(
-              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
-                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - 
mean, 2)) / bucketSizes.size()));
-
-          double rsdThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
-          //Relative standard deviation: If the standard deviation is larger 
than rsdThreshold * average_bucket_size,
-          // a rebalancing compaction is initiated.
-          if (standardDeviation > mean * rsdThreshold) {
-            LOG.debug("Initiating REBALANCE compaction on table {}", 
ci.tableName);
-            return true;
-          }
-        } catch (IOException e) {
-          LOG.error("Error occured during checking bucket file sizes, 
rebalance threshold calculation is skipped.", e);
-        }
-      } else {
-        LOG.debug("Table is smaller than the minimum required size for 
REBALANCE compaction.");
-      }
-    }
-    return false;
-  }
-
   private long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;
     if (dir.getBase() != null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
index 698346b9b72..50d04f8b1b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.session.SessionState;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index b0bec164fce..b1e690f16d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -185,8 +185,7 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     boolean isEnoughToCompact;
 
     if (ci.isRebalanceCompaction()) {
-      //However thresholds are used to schedule REBALANCE compaction, manual 
triggering is always allowed if the
-      //table and query engine supports it
+      //TODO: For now, we are allowing rebalance compaction regardless of the 
table state. Thresholds will be added later.
       return true;
     } else if (ci.isMajorCompaction()) {
       isEnoughToCompact =
@@ -282,7 +281,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
       findNextCompactRequest.setWorkerVersion(runtimeVersion);
       findNextCompactRequest.setPoolName(this.getPoolName());
       ci = 
CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(findNextCompactRequest));
-      ci.errorMessage = "";
       LOG.info("Processing compaction request {}", ci);
 
       if (ci == null) {
@@ -330,19 +328,10 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
         return false;
       }
 
-      if (LOG.isWarnEnabled()) {
-        boolean insertOnly = 
AcidUtils.isInsertOnlyTable(table.getParameters());
-        if (ci.type.equals(CompactionType.REBALANCE) && insertOnly) {
-          ci.errorMessage += "Falling back to MAJOR compaction as REBALANCE 
compaction is supported only on full-acid tables. ";
-          LOG.warn("REBALANCE compaction requested on an insert-only table 
({}). Falling back to MAJOR compaction as " +
-              "REBALANCE compaction is supported only on full-acid tables", 
table.getTableName());
-        }
-        if ((!ci.type.equals(CompactionType.REBALANCE) || insertOnly) && 
ci.numberOfBuckets > 0) {
-          ci.errorMessage += "Only REBALANCE compaction on a full-acid table 
accepts the number of buckets clause. " +
-              "(CLUSTERED INTO {N} BUCKETS).";
-          LOG.warn("Only REBALANCE compaction on a full-acid table accepts the 
number of buckets clause " +
-                  "(CLUSTERED INTO {N} BUCKETS). Since the compaction request 
is {} and the table is {}, it will be ignored.",
-              ci.type, insertOnly ? "insert-only" : "full-acid");
+      if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0) 
{
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Only the REBALANCE compaction accepts the number of 
buckets clause (CLUSTERED INTO {N} BUCKETS). " +
+              "Since the compaction request is {}, it will be ignored.", 
ci.type);
         }
       }
 
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 7f690dbd6fc..6a9a37dfe3f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -72,76 +72,6 @@ public class TestInitiator extends CompactorTest {
     startInitiator();
   }
 
-  @Test
-  public void compactRebalance() throws Exception {
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, 
true);
-    //Set the tresholds to reach the rebalance compaction threshold without 
reaching the major compaction threshold.
-    MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
-    MetastoreConf.setDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD, 0.02);
-
-    prepareRebalanceData();
-    startInitiator();
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("initiated", compacts.get(0).getState());
-    Assert.assertEquals("rebalance", compacts.get(0).getTablename());
-    Assert.assertEquals(CompactionType.REBALANCE, compacts.get(0).getType());
-  }
-
-  @Test
-  public void noCompactRebalanceSmallTable() throws Exception {
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, 
true);
-
-    prepareRebalanceData();
-    startInitiator();
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
-  }
-
-  @Test
-  public void noCompactRebalanceDataBalanced() throws Exception {
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, 
true);
-    //Set minimum size to let initiator check, but doesn't modify rebalance 
threshold. No rebalance compaciton should
-    //be initiated.
-    MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100);
-
-    prepareRebalanceData();
-    startInitiator();
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
-  }
-
-  private void prepareRebalanceData() throws Exception {
-    Table t = newTable("default", "rebalance", false);
-
-    addBaseFile(t, null, 200L, 200, 2, true);
-    addDeltaFile(t, null, 201L, 220L, 19, 2, false);
-
-    burnThroughTransactions("default", "rebalance", 220);
-
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
-    comp.setTablename("rebalance");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    txnHandler.lock(req);
-    long writeid = allocateWriteId("default", "rebalance", txnid);
-    Assert.assertEquals(221, writeid);
-    txnHandler.commitTxn(new CommitTxnRequest(txnid));
-  }
-
   @Test
   public void recoverFailedLocalWorkers() throws Exception {
     Table t = newTable("default", "rflw1", false);
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 20d30aa80ef..4af26205dcb 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -435,15 +435,6 @@ public class MetastoreConf {
         "Time after Initiator will ignore 
metastore.compactor.initiator.failed.compacts.threshold "
             + "and retry with compaction again. This will try to auto heal 
tables with previous failed compaction "
             + "without manual intervention. Setting it to 0 or negative value 
will disable this feature."),
-    
COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE("metastore.compactor.initiator.rebalance.min.size",
-        "hive.compactor.initiator.rebalance.min.size", 1024*1024*100,
-        "Minimum table/partition size for which a rebalancing compaction can 
be initiated."),
-    
COMPACTOR_INITIATOR_REBALANCE_THRESHOLD("metastore.compactor.initiator.rebalance.threshold",
-        "hive.compactor.initiator.rebalance.threshold", 0.2d,
-        "Threshold for the rebalancing compaction. If the 
std_dev/average_bucket_size (where std_dev is the " +
-            "standard deviation of the bucket sizes) is larger than the 
threshold, a rebalance compaction is initiated. " +
-            "In other words (assuming that the value is 0.2): If the standard 
deviation is larger than 20% of the average " +
-            "bucket size, a rebalancing compaction is initiated. "),
     COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", 
"hive.compactor.run.as.user", "",
         "Specify the user to run compactor Initiator and Worker as. If empty 
string, defaults to table/partition " +
         "directory owner."),
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ea24cb3ca41..f6529a5895c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -342,17 +342,15 @@ class CompactionTxnHandler extends TxnHandler {
   public void markCompacted(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = null;
-      PreparedStatement pstmt = null;
+      Statement stmt = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        String sql = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = ?, "
-            + "\"CQ_WORKER_ID\" = NULL, \"CQ_ERROR_MESSAGE\" = ? WHERE 
\"CQ_ID\" = ?";
-        pstmt = dbConn.prepareStatement(sql);
-        pstmt.setString(1, Character.toString(READY_FOR_CLEANING));
-        pstmt.setString(2, StringUtils.isNotBlank(info.errorMessage) ? 
info.errorMessage : null);
-        pstmt.setLong(3, info.id);
-        LOG.debug("Going to execute update <{}>", sql);
-        int updCnt = pstmt.executeUpdate();
+        stmt = dbConn.createStatement();
+        String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + 
READY_FOR_CLEANING + "', "
+            + "\"CQ_WORKER_ID\" = NULL"
+            + " WHERE \"CQ_ID\" = " + info.id;
+        LOG.debug("Going to execute update <{}>", s);
+        int updCnt = stmt.executeUpdate(s);
         if (updCnt != 1) {
           LOG.error("Unable to set cq_state={} for compaction record: {}. 
updCnt={}", READY_FOR_CLEANING, info, updCnt);
           LOG.debug("Going to rollback");
@@ -368,7 +366,7 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           e.getMessage());
       } finally {
-        closeStmt(pstmt);
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (RetryException e) {

Reply via email to