[ 
https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=497358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-497358
 ]

ASF GitHub Bot logged work on HIVE-21052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/20 14:17
            Start Date: 08/Oct/20 14:17
    Worklog Time Spent: 10m 
      Work Description: deniskuzZ commented on a change in pull request #1548:
URL: https://github.com/apache/hive/pull/1548#discussion_r501757123



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection1 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+    HiveStreamingConnection connection2 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  private void assertAndCompactCleanAbort(String dbName, String tblName) 
throws Exception {
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()));
+    if (3 != stat.length) {
+      Assert.fail("Expecting three directories corresponding to three 
partitions, FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 1, count);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+  }
+
+  @Test
+  public void testCleanAbortCompactSeveralTables() throws Exception {
+    String dbName = "default";
+    String tblName1 = "cws1";
+    String tblName2 = "cws2";
+
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName1, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName2, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,1".getBytes());
+    connection2.write("2,2".getBytes());
+    connection2.abortTransaction();
+
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    FileSystem fs = FileSystem.get(conf);
+    Table table1 = msClient.getTable(dbName, tblName1);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table1.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+    Table table2 = msClient.getTable(dbName, tblName2);
+    stat = fs.listStatus(new Path(table2.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 2, count);
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table1.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCorrectlyCleaned() throws Exception {
+    // Test that at commit the tables are cleaned properly
+    String dbName = "default";
+    String tblName = "cws";
+    HiveStreamingConnection connection = prepareTableAndConnection(dbName, 
tblName, 1);
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,2".getBytes());
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 1, count);
+
+    connection.commitTransaction();
+
+    // After commit the row should have been deleted
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+  }
+
+  @Test
+  public void testCleanAbortAndMinorCompact() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,2".getBytes());
+    connection.abortTransaction();
+
+    executeStatementOnDriver("insert into " + tblName + " partition (a) values 
(1, '1')", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b=1", driver);
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    runInitiator(conf);
+    runWorker(conf);
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 2, count);
+    // Cleaning should happen in threads concurrently for the minor compaction 
and the clean abort one.
+    runCleaner(conf);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+  }
+
+  private HiveStreamingConnection prepareTableAndConnection(String dbName, 
String tblName, int batchSize) throws Exception {

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 497358)
    Time Spent: 9.5h  (was: 9h 20m)

> Make sure transactions get cleaned if they are aborted before addPartitions 
> is called
> -------------------------------------------------------------------------------------
>
>                 Key: HIVE-21052
>                 URL: https://issues.apache.org/jira/browse/HIVE-21052
>             Project: Hive
>          Issue Type: Bug
>          Components: Transactions
>    Affects Versions: 3.0.0, 3.1.1
>            Reporter: Jaume M
>            Assignee: Jaume M
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, 
> HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, 
> HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, 
> HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, 
> HIVE-21052.8.patch, HIVE-21052.9.patch
>
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> If the transaction is aborted between openTxn and addPartitions and data has 
> been written on the table the transaction manager will think it's an empty 
> transaction and no cleaning will be done.
> This is currently an issue in the streaming API and in micromanaged tables. 
> As proposed by [~ekoifman] this can be solved by:
> * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and 
> when addPartitions is called remove this entry from TXN_COMPONENTS and add 
> the corresponding partition entry to TXN_COMPONENTS.
> * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that 
> specifies that a transaction was opened and it was aborted it must generate 
> jobs for the worker for every possible partition available.
> cc [~ewohlstadter]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to