leventov commented on a change in pull request #7038: Fix and document
concurrency of EventReceiverFirehose and TimedShutoffFirehose; Refine
concurrency specification of Firehose
URL: https://github.com/apache/incubator-druid/pull/7038#discussion_r255666681
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
##########
@@ -165,59 +168,141 @@ public int getBufferSize()
@JsonProperty
public long getMaxIdleTime()
{
- return maxIdleTime;
+ return maxIdleTimeMillis;
}
+ /**
+ * Apart from adhering to {@link Firehose} contract regarding concurrency,
this class has two methods that might be
+ * called concurrently with any other methods and each other, from arbitrary
number of threads: {@link #addAll} and
+ * {@link #shutdown}.
+ *
+ * This class creates and manages one thread for calling {@link #close()}
asynchronously in response to a {@link
+ * #shutdown} request, or after this Firehose has been idle (no calls to
{@link #addAll}) for {@link
+ * #maxIdleTimeMillis}.
+ */
+ @VisibleForTesting
public class EventReceiverFirehose implements ChatHandler, Firehose,
EventReceiverFirehoseMetric
{
- private final ScheduledExecutorService exec;
- private final ExecutorService idleDetector;
- private final BlockingQueue<InputRow> buffer;
- private final InputRowParser<Map<String, Object>> parser;
+ /**
+ * This field needs to be volatile because it's intialized via
double-checked locking in {@link #shutdown}. See
+ *
https://github.com/apache/incubator-druid/pull/6662#discussion_r254161160.
+ */
+ private volatile @Nullable Thread delayedCloseExecutor;
- private final Object readLock = new Object();
+ /** Contains {@link InputRow} objects, the last one is {@link
#FIREHOSE_CLOSED} which is a "poison pill". */
+ private final BlockingQueue<Object> buffer;
+ private final InputRowParser<Map<String, Object>> parser;
- private volatile InputRow nextRow = null;
+ /** This field needs to be volatile to ensure progress in {@link #addRows}
method where it is read in a loop. */
private volatile boolean closed = false;
+
+ /**
+ * This field and {@link #rowsRunOut} are not volatile because they are
accessed only from {@link #hasMore()} and
+ * {@link #nextRow()} methods that are called from a single thread
according to {@link Firehose} spec.
+ */
+ private InputRow nextRow = null;
+ private boolean rowsRunOut = false;
+
private final AtomicLong bytesReceived = new AtomicLong(0);
- private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
+ private final AtomicLong lastBufferAddFailLoggingTimeNs = new
AtomicLong(System.nanoTime());
private final ConcurrentHashMap<String, Long> producerSequences = new
ConcurrentHashMap<>();
- private final Stopwatch idleWatch = Stopwatch.createUnstarted();
- public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
+ /**
+ * This field and {@link #requestedShutdownTimeNsHolder} use nanoseconds
instead of milliseconds not to deal with
+ * the fact that {@link System#currentTimeMillis()} can "go backward", e.
g. due to time correction on the server.
+ */
+ private final AtomicReference<Long> idleCloseTimeNsHolder = new
AtomicReference<>();
+ private final AtomicReference<Long> requestedShutdownTimeNsHolder = new
AtomicReference<>();
+
+ EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
- exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
- idleDetector =
Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
- idleDetector.submit(() -> {
- long idled;
- try {
- while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) <
maxIdleTime) {
- Thread.sleep(maxIdleTime - idled);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
+
+ if (maxIdleTimeMillis != Long.MAX_VALUE) {
+ idleCloseTimeNsHolder.set(System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(maxIdleTimeMillis));
+ synchronized (this) {
+ createDelayedCloseExecutor();
}
- log.info("Firehose has been idle for %d ms, closing.", idled);
- close();
- });
- idleWatch.start();
+ }
}
+ @GuardedBy("this")
+ private Thread createDelayedCloseExecutor()
+ {
+ Thread delayedCloseExecutor = new Thread(
+ () -> {
+ // The closed = true is visible after close() because there is a
happens-before edge between
+ // delayedCloseExecutor.interrupt() call in close() and catching
InterruptedException below in this loop.
+ while (!closed) {
+ Long closeTimeNs = null;
+ Boolean dueToShutdownRequest = null;
+ Long idleCloseTimeNs = idleCloseTimeNsHolder.get();
+ if (idleCloseTimeNs != null) {
+ closeTimeNs = idleCloseTimeNs;
+ dueToShutdownRequest = false;
+ }
+ Long requestedShutdownTimeNs =
requestedShutdownTimeNsHolder.get();
+ if (requestedShutdownTimeNs != null) {
+ if (closeTimeNs == null || requestedShutdownTimeNs -
closeTimeNs <= 0) { // overflow-aware comparison
+ closeTimeNs = requestedShutdownTimeNs;
+ dueToShutdownRequest = true;
+ }
+ }
+ if (closeTimeNs == null) {
+ log.error(
+ "A bug in EventReceiverFirehose code, "
+ + "either idleCloseTimeNs or requestedShutdownTimeNs
should be non-null"
+ );
+ try {
+ Threads.sleepFor(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException ignore) {
+ // Interruption is a wakeup, continue the loop
+ }
+ continue;
+ }
+ long closeTimeoutNs = closeTimeNs - System.nanoTime();
+ if (closeTimeoutNs <= 0) {
+ if (dueToShutdownRequest) {
+ log.info("Closing Firehose after a shutdown request");
+ } else {
+ log.info("Firehose has been idle for %d ms, closing.",
maxIdleTimeMillis);
+ }
+ close();
+ return;
+ } else {
+ try {
+ Threads.sleepFor(closeTimeoutNs, TimeUnit.NANOSECONDS);
+ }
+ catch (InterruptedException ignore) {
+ // Interruption is a wakeup, continue the loop
+ }
+ }
+ }
+ },
+ "event-receiver-firehose-closer"
Review comment:
I actually spent a fair amount of time trying different approaches, and this
version emerged as something robust in the face of arbitrary shutoff and idle
timeout jumps, and creating from both the constructor and lazily from
`shutdown()`. But I'm open to concrete suggestions about how this method can be
improved.
I added a Javadoc comment to this method that described the behavior of the
thread (and its interruption policy) on a high level. Is it clearer with the
comment?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]