kfaraz commented on code in PR #16393:
URL: https://github.com/apache/druid/pull/16393#discussion_r1592653118
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1271,6 +1277,102 @@ public void remove(final Task task)
}
}
+ /**
+ * Acquire a read lock to perform the segment transactional append action
for a given datasource.
+ * Also verifies that all the locks are of the type APPEND for the task.
+ * @param task task to perform the append action
+ * @param lockAcquireTimeoutMillis milliseconds to wait for lock acquisition
+ */
+ public void acquireTransactionalAppendLock(Task task, long
lockAcquireTimeoutMillis)
+ {
+ final String datasource = task.getDataSource();
+ final ReentrantReadWriteLock readWriteLock =
+ datasourceToConcurrentLock.computeIfAbsent(datasource, ds -> new
ReentrantReadWriteLock(true));
+
+ synchronized (readWriteLock) {
+ try {
+ final boolean acquired = readWriteLock.readLock()
+
.tryLock(lockAcquireTimeoutMillis, TimeUnit.MILLISECONDS);
+ if (!acquired) {
+ throw DruidException
+ .forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.TIMEOUT)
+ .build("Timed out while acquiring transactional append lock for
datasource[%s].", datasource);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new ISE(e, "Interrupted while acquiring transactional append
lock for datasource[%s].", datasource);
+ }
+ }
+ }
+
+ /**
+ * Acquire a write lock to perform the segment transactional replace action
for a given datasource.
+ * Also verifies that all the locks are of the type REPLACE for the task.
+ * @param task task to perform the replace action
+ * @param lockAcquireTimeoutMillis milliseconds to wait for lock acquisition
+ */
+ public void acquireTransactionalReplaceLock(Task task, long
lockAcquireTimeoutMillis)
+ {
+ final String datasource = task.getDataSource();
+ final ReentrantReadWriteLock readWriteLock =
+ datasourceToConcurrentLock.computeIfAbsent(datasource, ds -> new
ReentrantReadWriteLock(true));
+
+ synchronized (readWriteLock) {
+ try {
+ final boolean acquired = readWriteLock.writeLock()
+
.tryLock(lockAcquireTimeoutMillis, TimeUnit.MILLISECONDS);
+ if (!acquired) {
+ throw DruidException
+ .forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.TIMEOUT)
+ .build("Timed out while acquiring transactional replace lock for
datasource[%s].", datasource);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new ISE(e, "Interrupted while acquiring transactional replace
lock for datasource[%s].", datasource);
+ }
+ }
+ }
+
+ /**
+ * Release the transactional append lock acquired by a task
+ * @param task task to perform the append action
+ */
+ public void releaseTransactionalAppendLock(Task task)
+ {
+ final String datasource = task.getDataSource();
+ if (!datasourceToConcurrentLock.containsKey(datasource)) {
+ return;
+ }
+ final ReentrantReadWriteLock readWriteLock =
datasourceToConcurrentLock.get(datasource);
Review Comment:
These two operations should be merged. Otherwise, there can be race
conditions and by the time we do the `get`, the entry might have already been
removed from the map.
Instead of this, just do a `get` followed by a null check of the value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]