This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2fac674 Add maxIdleTime option to EventReceiverFirehose (#5997)
2fac674 is described below
commit 2fac6743d49574a6243f823cfddfe8add1124fea
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Sep 18 04:50:56 2018 +0800
Add maxIdleTime option to EventReceiverFirehose (#5997)
---
.../firehose/EventReceiverFirehoseFactory.java | 35 ++++++
.../firehose/EventReceiverFirehostIdleTest.java | 124 +++++++++++++++++++++
.../firehose/EventReceiverFirehoseTest.java | 3 +
3 files changed, 162 insertions(+)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index fc58d9d..14b7a19 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -74,6 +75,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -88,9 +90,11 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
private static final EmittingLogger log = new
EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100_000;
+ private static final long DEFAULT_MAX_IDLE_TIME = Long.MAX_VALUE;
private final String serviceName;
private final int bufferSize;
+ private final long maxIdleTime;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@@ -101,6 +105,7 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
+ @JsonProperty("maxIdleTime") Long maxIdleTime,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper,
@@ -112,6 +117,8 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ?
DEFAULT_BUFFER_SIZE : bufferSize;
+ this.maxIdleTime = maxIdleTime == null || maxIdleTime <= 0 ?
+ DEFAULT_MAX_IDLE_TIME : maxIdleTime;
this.chatHandlerProvider = Optional.ofNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
@@ -155,9 +162,16 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
return bufferSize;
}
+ @JsonProperty
+ public long getMaxIdleTime()
+ {
+ return maxIdleTime;
+ }
+
public class EventReceiverFirehose implements ChatHandler, Firehose,
EventReceiverFirehoseMetric
{
private final ScheduledExecutorService exec;
+ private final ExecutorService idleDetector;
private final BlockingQueue<InputRow> buffer;
private final InputRowParser<Map<String, Object>> parser;
@@ -168,12 +182,29 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);
private final ConcurrentMap<String, Long> producerSequences = new
ConcurrentHashMap<>();
+ private final Stopwatch idleWatch = Stopwatch.createUnstarted();
public EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)
{
this.buffer = new ArrayBlockingQueue<>(bufferSize);
this.parser = parser;
exec = Execs.scheduledSingleThreaded("event-receiver-firehose-%d");
+ idleDetector =
Execs.singleThreaded("event-receiver-firehose-idle-detector-%d");
+ idleDetector.submit(() -> {
+ long idled;
+ try {
+ while ((idled = idleWatch.elapsed(TimeUnit.MILLISECONDS)) <
maxIdleTime) {
+ Thread.sleep(maxIdleTime - idled);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ log.info("Firehose has been idle for %d ms, closing.", idled);
+ close();
+ });
+ idleWatch.start();
}
@POST
@@ -185,6 +216,8 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
@Context final HttpServletRequest req
)
{
+ idleWatch.reset();
+ idleWatch.start();
Access accessResult = AuthorizationUtils.authorizeResourceAction(
req,
new ResourceAction(
@@ -328,6 +361,8 @@ public class EventReceiverFirehoseFactory implements
FirehoseFactory<InputRowPar
chatHandlerProvider.get().unregister(serviceName);
}
exec.shutdown();
+ idleDetector.shutdown();
+ idleWatch.stop();
}
}
diff --git
a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
new file mode 100644
index 0000000..18c289e
--- /dev/null
+++
b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehostIdleTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.firehose;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
+import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
+import org.apache.druid.server.security.AllowAllAuthenticator;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Locale;
+
+public class EventReceiverFirehostIdleTest
+{
+ private static final int CAPACITY = 300;
+ private static final long MAX_IDLE_TIME = 5_000L;
+ private static final String SERVICE_NAME = "test_firehose";
+
+ private final String inputRow = "[{\n"
+ + " \"timestamp\":123,\n"
+ + " \"d1\":\"v1\"\n"
+ + "}]";
+
+ private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
+ private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
+ private EventReceiverFirehoseRegister register = new
EventReceiverFirehoseRegister();
+ private HttpServletRequest req;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ req = EasyMock.createMock(HttpServletRequest.class);
+ eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
+ SERVICE_NAME,
+ CAPACITY,
+ MAX_IDLE_TIME,
+ null,
+ new DefaultObjectMapper(),
+ new DefaultObjectMapper(),
+ register,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER
+ );
+ firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose)
eventReceiverFirehoseFactory.connect(
+ new MapInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec(
+ "timestamp",
+ "auto",
+ null
+ ), new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null,
null),
+ null,
+ null
+ )
+ ),
+ null
+ );
+ }
+
+ @Test(timeout = 40_000L)
+ public void testIdle() throws Exception
+ {
+ Thread.sleep(8_000L);
+ Assert.assertTrue(firehose.isClosed());
+ }
+
+ @Test(timeout = 40_000L)
+ public void testNotIdle() throws Exception
+ {
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
+ .andReturn(null)
+ .anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH))
+ .andReturn(null)
+ .anyTimes();
+ EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT)
+ .anyTimes();
+
EasyMock.expect(req.getHeader("X-Firehose-Producer-Id")).andReturn(null).anyTimes();
+
EasyMock.expect(req.getContentType()).andReturn("application/json").anyTimes();
+ req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(req);
+
+ final int checks = 5;
+ for (int i = 0; i < checks; i++) {
+ Assert.assertFalse(firehose.isClosed());
+ System.out.printf(Locale.ENGLISH, "Check %d/%d passed\n", i + 1, checks);
+ firehose.addAll(IOUtils.toInputStream(inputRow), req);
+ Thread.sleep(3_000L);
+ }
+
+ Thread.sleep(5_000L);
+ Assert.assertTrue(firehose.isClosed());
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
index 62414e1..683c8ce 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
@@ -57,6 +57,7 @@ public class EventReceiverFirehoseTest
{
private static final int CAPACITY = 300;
private static final int NUM_EVENTS = 100;
+ private static final long MAX_IDLE_TIME = Long.MAX_VALUE;
private static final String SERVICE_NAME = "test_firehose";
private final String inputRow = "[{\n"
@@ -76,6 +77,7 @@ public class EventReceiverFirehoseTest
eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
+ MAX_IDLE_TIME,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
@@ -217,6 +219,7 @@ public class EventReceiverFirehoseTest
EventReceiverFirehoseFactory eventReceiverFirehoseFactory2 = new
EventReceiverFirehoseFactory(
SERVICE_NAME,
CAPACITY,
+ MAX_IDLE_TIME,
null,
new DefaultObjectMapper(),
new DefaultObjectMapper(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]