jon-wei closed pull request #5997: Add maxIdleTime option to
EventReceiverFirehose
URL: https://github.com/apache/incubator-druid/pull/5997
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index fc58d9d3be2..14b7a19a8ea 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -28,6 +28,7 @@
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -74,6 +75,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,9 +90,11 @@
private static final EmittingLogger log = new
EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000;
+ private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
private final String serviceName;
private final int bufferSize;
+ private final long maxIdleTime;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@@ -101,6 +105,7 @@
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
+ @JsonProperty("maxIdleTime") Long maxIdleTime,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper,
@@ -112,6 +117,8 @@ public EventReceiverFirehoseFactory(
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ?
DEFAULT_BUFFER_SIZE : bufferSize;
+ this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
+ DEFAULT_MAX_IDLE_TIME : maxIdleTime;
this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
@@ -155,9 +162,16 @@ public int getBufferSize()
return bufferSize;
}
+ @JsonProperty
+ public long getMaxIdleTime()
+ {
+ return maxIdleTime;
+ }
+
public class EventReceiverFirehose implements ChatHandler, Firehose,
EventReceiverFirehoseMetric
{
private final ScheduledExecutorService exec;
+ private final ExecutorService idleDetector;
private final BlockingQueue<InputRow> buffer;
private final InputRowParser<Map<String, Object>> parser;
@@ -168,12 +182,29 @@ public int getBufferSize()
private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
private final ConcurrentMap<String, Long> producerSequences = new
ConcurrentHashMap<>();
+ private final Stopwatch idleWatch = Stopwatch.createUnstarted();
public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
+ idleDetector =
Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
+ idleDetector.submit(() -> {
+ long idled;
+ try {
+ while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) <
maxIdleTime) {
+ Thread.sleep(maxIdleTime - idled);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ log.info("Firehose has been idle for %d ms, closing.", idled);
+ close();
+ });
+ idleWatch.start();
}
@POST
@@ -185,6 +216,8 @@ public Response addAll(
@Context final HttpServletRequest req
)
{
+ idleWatch.reset();
+ idleWatch.start();
Access accessResult = AuthorizationUtils.authorizeResourceAction(
req,
new ResourceAction(
@@ -328,6 +361,8 @@ public void close()
chatHandlerProvider.get().unregister(serviceName);
}
exec.shutdown();
+ idleDetector.shutdown();
+ idleWatch.stop();
}
}
diff --git
a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
new file mode 100644
index 00000000000..18c289ebc9e
--- /dev/null
+++
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.firehose;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
+import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
+import org.apache.druid.server.security.AllowAllAuthenticator;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Locale;
+
+public class EventReceiverFirehostIdleTest
+{
+ private static final int CAPACITY = 300;
+ private static final long MAX_IDLE_TIME = 5_000L;
+ private static final String SERVICE_NAME = "test_firehose";
+
+ private final String inputRow = "[{\n"
+ + " \"timestamp\":123,\n"
+ + " \"d1\":\"v1\"\n"
+ + "}]";
+
+ private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
+ private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
+ private EventReceiverFirehoseRegister register = new
EventReceiverFirehoseRegister();
+ private HttpServletRequest req;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ req = EasyMock.createMock(HttpServletRequest.class);
+ eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
+ SERVICE_NAME,
+ CAPACITY,
+ MAX_IDLE_TIME,
+ null,
+ new DefaultObjectMapper(),
+ new DefaultObjectMapper(),
+ register,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER
+ );
+ firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose)
eventReceiverFirehoseFactory.connect(
+ new MapInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec(
+ "timestamp",
+ "auto",
+ null
+ ), new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null,
null),
+ null,
+ null
+ )
+ ),
+ null
+ );
+ }
+
+ @Test(timeout = 40_000L)
+ public void testIdle() throws Exception
+ {
+ Thread.sleep(8_000L);
+ Assert.assertTrue(firehose.isClosed());
+ }
+
+ @Test(timeout = 40_000L)
+ public void testNotIdle() throws Exception
+ {
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
+ .andReturn(null)
+ .anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH))
+ .andReturn(null)
+ .anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT)
+ .anyTimes();
+
EasyMock.expect(req.getHeader("X-Firehose-Producer-Id")).andReturn(null).anyTimes();
+
EasyMock.expect(req.getContentType()).andReturn("application/json").anyTimes();
+ req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(req);
+
+ final int checks = 5;
+ for (int i = 0; i < checks; i++) {
+ Assert.assertFalse(firehose.isClosed());
+ System.out.printf(Locale.ENGLISH, "Check %d/%d passed\n", i + 1, checks);
+ firehose.addAll(IOUtils.toInputStream(inputRow), req);
+ Thread.sleep(3_000L);
+ }
+
+ Thread.sleep(5_000L);
+ Assert.assertTrue(firehose.isClosed());
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
index 62414e1228e..683c8ce061f 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
@@ -57,6 +57,7 @@
{
private static final int CAPACITY = 300;
private static final int NUM_EVENTS = 100;
+ private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n"
@@ -76,6 +77,7 @@ public void setUp()
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
+ MAX_IDLE_TIME,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@@ -217,6 +219,7 @@ public void testDuplicateRegistering()
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new
EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
+ MAX_IDLE_TIME,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]