zabetak commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r797437477



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
##########
@@ -88,4 +90,13 @@ static String getCompactorJobQueueName(HiveConf conf, 
CompactionInfo ci, Table t
     }
     return null;
   }
+
+  public static ThreadFactory createThreadFactory(String threadNameFormat) {
+    return new ThreadFactoryBuilder()
+            .setPriority(Thread.currentThread().getPriority())
+            .setDaemon(Thread.currentThread().isDaemon())

Review comment:
       Do we want the heartbeater to run with same options as the current 
thread? From the code below it seems that before it was a daemon thread with 
min priority.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -207,26 +206,16 @@ static void gatherStats(CompactionInfo ci, HiveConf conf, 
String userName, Strin
     }
   }
 
-  static final class CompactionHeartbeater extends Thread {
+  static final class CompactionHeartbeater implements Runnable {
     static final private Logger LOG = 
LoggerFactory.getLogger(CompactionHeartbeater.class);
     private final CompactionTxn compactionTxn;
     private final String tableName;
     private final HiveConf conf;
-    private final AtomicBoolean errorLogEnabled;
 
     public CompactionHeartbeater(CompactionTxn compactionTxn, String 
tableName, HiveConf conf) {
       this.tableName = Objects.requireNonNull(tableName);
       this.compactionTxn = Objects.requireNonNull(compactionTxn);
       this.conf = Objects.requireNonNull(conf);
-      this.errorLogEnabled = new AtomicBoolean(true);
-
-      setDaemon(true);
-      setPriority(MIN_PRIORITY);
-      setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
-    }
-
-    public void shouldLogError(boolean shouldLogError) {
-      this.errorLogEnabled.set(shouldLogError);

Review comment:
       Why it was necessary to have conditional error logging in HIVE-25740?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +174,73 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires 
table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " 
PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " 
PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data 
for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from 
CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, 
RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    AtomicReference<Worker.CompactionTxn> capture = new AtomicReference<>();
+    doAnswer(invocation -> {
+      Worker.CompactionTxn compactionTxn = (Worker.CompactionTxn) 
invocation.callRealMethod();
+      capture.set(compactionTxn);
+      return compactionTxn;
+    }).when(worker).newCompactionTxn();
+
+    worker.run();
+
+    //Check if the heartbeating is properly terminated
+    Assert.assertTrue(capture.get().isHeartbeatTerminated());

Review comment:
       Can't we simply exploit `Thread.getAllStackTraces()` to find if a thread 
is running or not. This would probably make `Mockito` redundant and it could 
possibly allow us to remove `isHeartbeatTerminated()`. Moreover, at the moment 
`isHeartbeatTerminated` will return true if the executor is `null` so I am not 
sure if the assertion is completely valid.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless 
of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition 
between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 
seconds, do not wait any longer.", this);

Review comment:
       This is a potential thread leak. Wouldn't be better to log as error or 
at least warning?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + 
status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in 
worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       I see places where we are explicitly setting msc to null. I don't know 
well this part of the code but if null checks are redundant wouldn't be better 
to treat it in a separate PR?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless 
of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition 
between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 
seconds, do not wait any longer.", this);
           }
-          LOG.debug("Successfully stopped heartbeating for transaction {}", 
this);
         } catch (InterruptedException ex) {
-          heartbeatExecutor.shutdownNow();
+          //Caller thread was interrupted while waiting for heartbeater to 
terminate, nothing to do

Review comment:
       Probably worth logging instead of completely ignoring the interruption.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to