egor-ryashin commented on a change in pull request #6370: Introduce SegmentId
class
URL: https://github.com/apache/incubator-druid/pull/6370#discussion_r224028256
##########
File path:
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
##########
@@ -148,68 +154,63 @@ public void start()
final Duration delay =
config.get().getPollDuration().toStandardDuration();
exec.scheduleWithFixedDelay(
- new Runnable()
- {
- @Override
- public void run()
- {
- // poll() is synchronized together with start(), stop() and
isStarted() to ensure that when stop() exists,
- // poll() won't actually run anymore after that (it could only
enter the syncrhonized section and exit
- // immediately because the localStartedOrder doesn't match the
new currentStartOrder). It's needed
- // to avoid flakiness in SQLMetadataSegmentManagerTest.
- // See https://github.com/apache/incubator-druid/issues/6028
- readLock.lock();
- try {
- if (localStartOrder == currentStartOrder) {
- poll();
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "uncaught exception in segment manager
polling thread").emit();
-
- }
- finally {
- readLock.unlock();
- }
- }
- },
+ createPollTaskForStartOrder(localStartOrder),
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);
}
finally {
- writeLock.unlock();
+ lock.unlock();
}
}
+ private Runnable createPollTaskForStartOrder(long startOrder)
+ {
+ return () -> {
+ // poll() is synchronized together with start(), stop() and isStarted()
to ensure that when stop() exits, poll()
+ // won't actually run anymore after that (it could only enter the
syncrhonized section and exit immediately
+ // because the localStartedOrder doesn't match the new
currentStartOrder). It's needed to avoid flakiness in
+ // SQLMetadataSegmentManagerTest. See
https://github.com/apache/incubator-druid/issues/6028
+ ReentrantReadWriteLock.ReadLock lock = startStopLock.readLock();
+ lock.lock();
+ try {
+ if (startOrder == currentStartOrder) {
+ poll();
+ }
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "uncaught exception in segment manager polling
thread").emit();
+ }
+ finally {
+ lock.unlock();
+ }
+ };
+ }
+
@Override
@LifecycleStop
public void stop()
{
- writeLock.lock();
+ ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+ lock.lock();
try {
if (!isStarted()) {
Review comment:
If a starting thread is waiting on the lock here, then it will proceed on
starting the service afterward, causing NPE, I expect.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]