zabetak commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r799803119
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
+ "}, status {" + res.getState() + "}, reason {" +
res.getErrorMessage() + "}");
}
lockId = res.getLockid();
-
- heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+ CompactorUtil.createThreadFactory(
+ "CompactionTxn Heartbeater - " + txnId,
Thread.MIN_PRIORITY, true));
Review comment:
nit: I would keep the thread name unchanged just to avoid potentially
breaking people scripts when analyzing stack traces etc.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
+ "}, status {" + res.getState() + "}, reason {" +
res.getErrorMessage() + "}");
}
lockId = res.getLockid();
-
- heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+ CompactorUtil.createThreadFactory(
+ "CompactionTxn Heartbeater - " + txnId,
Thread.MIN_PRIORITY, true));
Review comment:
nit: You could even avoid introducing a new utility method (which is
used in only one place) and dependency to Guava via:
```
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor((runnable) ->
{
Thread t = new Thread(runnable);
t.setDaemon(true);
t.setName("CompactionHeartbeater-" + txnId);
t.setPriority(Thread.MIN_PRIORITY);
return t;
});
```
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
+ "}, status {" + res.getState() + "}, reason {" +
res.getErrorMessage() + "}");
}
lockId = res.getLockid();
-
- heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+ CompactorUtil.createThreadFactory(
+ "CompactionTxn Heartbeater - " + txnId,
Thread.MIN_PRIORITY, true));
Review comment:
No strong feelings up to you :)
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +173,67 @@ public void tearDown() {
}
}
+
+ @Test
+ public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+ String dbName = "default";
+ String tblName = "compaction_test";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(bkt INT)" +
+ " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires
table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+
+ StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+ HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tblName)
+ .withStaticPartitionValues(Arrays.asList("0"))
+ .withAgentInfo("UT_" + Thread.currentThread().getName())
+ .withHiveConf(conf)
+ .withRecordWriter(writer)
+ .connect();
+ connection.beginTransaction();
+ connection.write("55, 'London'".getBytes());
+ connection.commitTransaction();
+ connection.beginTransaction();
+ connection.write("56, 'Paris'".getBytes());
+ connection.commitTransaction();
+ connection.close();
+
+ executeStatementOnDriver("INSERT INTO TABLE " + tblName + "
PARTITION(bkt=1)" +
+ " values(57, 'Budapest')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tblName + "
PARTITION(bkt=1)" +
+ " values(58, 'Milano')", driver);
+ execSelectAndDumpData("select * from " + tblName, driver, "Dumping data
for " +
+ tblName + " after load:");
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+ // Commit will throw an exception
+ IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
+ doThrow(new RuntimeException("Simulating RuntimeException from
CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+ //Do a major compaction
+ CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MAJOR);
+ rqst.setPartitionname("bkt=0");
+ txnHandler.compact(rqst);
+
+ Worker worker = Mockito.spy(new Worker());
+ worker.setThreadId((int) worker.getId());
+ worker.setConf(conf);
+ worker.init(new AtomicBoolean(true));
+ FieldSetter.setField(worker,
RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+ worker.run();
+
+ //Check if the heartbeating is properly terminated
+ Assert.assertTrue(Thread.getAllStackTraces().keySet()
+ .stream().noneMatch(k -> k.getName().contains("CompactionTxn
Heartbeater")));
Review comment:
Maybe an assertion above that it really runs at some point could be
useful.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -657,24 +649,19 @@ private String getWorkerId() {
/**
* Keep track of the compaction's transaction and its operations.
*/
- private class CompactionTxn implements AutoCloseable {
+ class CompactionTxn implements AutoCloseable {
Review comment:
Do we need to change visibility?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
return lockId;
}
+ boolean isHeartbeatTerminated() {
+ return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+ }
+
@Override public String toString() {
return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " +
status + ")";
}
/**
* Commit the txn if open.
*/
- private void commit() {
- if (msc == null) {
- LOG.error("Metastore client was null. Could not commit txn " + this);
- return;
- }
+ private void commit() throws TException {
if (status == TxnStatus.OPEN) {
- try {
- msc.commitTxn(txnId);
- status = TxnStatus.COMMITTED;
- } catch (TException e) {
- LOG.error("Caught an exception while committing compaction txn in
worker " + workerName, e);
- }
+ msc.commitTxn(txnId);
Review comment:
@veghlaci05 have you seen my comment above?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
* @throws Exception
*/
@Override public void close() throws Exception {
+ //the transaction is about to close, we can stop heartbeating regardless
of it's state
+ shutdownHeartbeater();
if (status != TxnStatus.UNKNOWN) {
- // turn off error logging in heartbeater in case of race condition
between commit/abort and heartbeating
- heartbeater.shouldLogError(false);
if (succeessfulCompaction) {
commit();
} else {
abort();
}
}
- shutdownHeartbeater();
}
private void shutdownHeartbeater() {
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
try {
if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- heartbeatExecutor.shutdownNow();
+ LOG.debug("Heartbeating for transaction {} did not stop in 5
seconds, do not wait any longer.", this);
}
- LOG.debug("Successfully stopped heartbeating for transaction {}",
this);
} catch (InterruptedException ex) {
- heartbeatExecutor.shutdownNow();
+ //Caller thread was interrupted while waiting for heartbeater to
terminate, nothing to do
Review comment:
If there is an interrupt during `heartbeatExecutor#awaitTermination` we
will never see another log (nor stacktrace) that there was an attempt to
shutdown the heartbeater. Moreover, I don't feel very comfortable about
swallowing completely an interrupted exception. The least that I would expect
here is:
`Thread.currentThread().interrupt();`
but if you have thoroughly though about it and it is not necessary then I
trust your judgement.
TLDR
Citing the "Java Concurrency in Practice":
Propagate the InterruptedException . This is often the most sensible policy
if you can get away with it just propagate the InterruptedException to your
caller. This could involve not catching InterruptedException , or catching it
and throwing it again after performing some brief activity specific cleanup.
Restore the interrupt. Sometimes you cannot throw InterruptedException , for
instance when your code is part of a Runnable . In these situations, you must
catch InterruptedException and restore the interrupted status by calling
interrupt on the current thread, so that code higher up the call stack can see
that an interrupt was issued, as demonstrated in Listing 5.10.
You can get much more sophisticated with interruption, but these two
approaches should work in the vast majority of situations. But there is one
thing you should not do with InterruptedException catch it and do nothing in
response. This deprives code higher up on the call stack of the opportunity to
act on the interruption, because the evidence that the thread was interrupted
is lost. The only situation in which it is acceptable to swallow an interrupt
is when you are extending Thread and therefore control all the code higher up
on the call stack. Cancellation and interruption are covered in greater detail
in Chapter 7.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]