This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7a44f4a Manually trigger PeriodicTask (#7174)
7a44f4a is described below
commit 7a44f4af2c5e2cb5dfb96c33d2010efb8e5f1e74
Author: Amrish Lal <[email protected]>
AuthorDate: Mon Aug 23 12:06:14 2021 -0700
Manually trigger PeriodicTask (#7174)
* Don't allow a PeriodicTask to execute more than once at a time.
* Revert pom.xml change.
* Add controller message handler.
* Cleanup imports.
* Execute periodic task after controller receives message.
* Add test case.
* Additional log messages.
* Fix test case.
* Trigger task against specific table.
* Cleanup.
* Rebuild.
* Code review changes.
* Cleanup.
* Controller api to get list of available task names.
* Cleanup.
* Cleanup.
* Cleanup.
* Codereview changes.
* Cleanup.
* Controller API to set only one table and table type for manual periodic
task execution.
* Codereview changes.
* Cleanup.
* Codereview changes.
* Update test case to use countdown latch.
* Cleanup.
* Cleanup.
* Codreview changes.
* Add request id to link PeriodicTask execution reqeust with its log
entries.
* Cleanup.
* Rebuild.
* Fix NPE.
* Cleanup.
* Codereview changes.
* Cleanup.
* Codereview changes.
* Rebuild.
* Fix test case.
* Rebuild.
* Fix test case.
* Codereview changes.
* Cleanup.
* Cleanup /periodictask/run API
* Cleanup.
* Pass Properties object on the stack.
* Codereview changes.
* Fix checkstyle violations.
* fix checkstyle.
---
.../common/messages/RunPeriodicTaskMessage.java | 66 ++++++++++
.../pinot/controller/BaseControllerStarter.java | 6 +
...ControllerUserDefinedMessageHandlerFactory.java | 134 +++++++++++++++++++++
.../pinot/controller/api/resources/Constants.java | 1 +
...PinotControllerPeriodicTaskRestletResource.java | 123 +++++++++++++++++++
.../core/minion/MinionInstancesCleanupTask.java | 3 +-
.../helix/core/minion/TaskMetricsEmitter.java | 3 +-
.../core/periodictask/ControllerPeriodicTask.java | 26 +++-
.../minion/MinionInstancesCleanupTaskTest.java | 9 +-
.../pinot/core/periodictask/BasePeriodicTask.java | 93 ++++++++------
.../pinot/core/periodictask/PeriodicTask.java | 11 ++
.../core/periodictask/PeriodicTaskScheduler.java | 62 +++++++++-
.../periodictask/PeriodicTaskSchedulerTest.java | 85 ++++++++++++-
13 files changed, 574 insertions(+), 48 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
new file mode 100644
index 0000000..11ee6cf
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Upon receiving this message, Controller will execute the specified
PeriodicTask against the tables for which it is
+ * the lead controller. The message is sent whenever API call for executing a
PeriodicTask is invoked.
+ */
+public class RunPeriodicTaskMessage extends Message {
+ public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE =
"RUN_PERIODIC_TASK";
+ private static final String PERIODIC_TASK_REQUEST_ID = "requestId";
+ private static final String PERIODIC_TASK_NAME_KEY = "taskName";
+ private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
+
+ /**
+ * @param taskRequestId Request Id that will be appended to log messages.
+ * @param periodicTaskName Name of the task that will be run.
+ * @param tableNameWithType Table (names with type suffix) on which task
will run.
+ */
+ public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName,
String tableNameWithType) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(RUN_PERIODIC_TASK_MSG_SUB_TYPE);
+ setExecutionTimeout(-1);
+ ZNRecord znRecord = getRecord();
+ znRecord.setSimpleField(PERIODIC_TASK_REQUEST_ID, taskRequestId);
+ znRecord.setSimpleField(PERIODIC_TASK_NAME_KEY, periodicTaskName);
+ znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType);
+ }
+
+ public RunPeriodicTaskMessage(Message message) {
+ super(message.getRecord());
+ }
+
+ public String getPeriodicTaskRequestId() {
+ return getRecord().getSimpleField(PERIODIC_TASK_REQUEST_ID);
+ }
+
+ public String getPeriodicTaskName() {
+ return getRecord().getSimpleField(PERIODIC_TASK_NAME_KEY);
+ }
+
+ public String getTableNameWithType() {
+ return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index b6471b1..39b9b1e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -411,6 +411,11 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_periodicTaskScheduler.init(controllerPeriodicTasks);
_periodicTaskScheduler.start();
+ // Register message handler for incoming user-defined helix messages.
+ _helixParticipantManager.getMessagingService()
+
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+ new
ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler));
+
String accessControlFactoryClass = _config.getAccessControlFactoryClass();
LOGGER.info("Use class: {} as the AccessControlFactory",
accessControlFactoryClass);
final AccessControlFactory accessControlFactory;
@@ -443,6 +448,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(accessControlFactory).to(AccessControlFactory.class);
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
bind(_leadControllerManager).to(LeadControllerManager.class);
+ bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
}
});
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
new file mode 100644
index 0000000..fd9dfb8
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.Properties;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Factory class for creating message handlers for incoming helix messages. */
+public class ControllerUserDefinedMessageHandlerFactory implements
MessageHandlerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerUserDefinedMessageHandlerFactory.class);
+ private static final String USER_DEFINED_MSG_STRING =
Message.MessageType.USER_DEFINE_MSG.toString();
+
+ private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+ public ControllerUserDefinedMessageHandlerFactory(PeriodicTaskScheduler
periodicTaskScheduler) {
+ _periodicTaskScheduler = periodicTaskScheduler;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext
notificationContext) {
+ String messageType = message.getMsgSubType();
+ if
(messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) {
+ return new RunPeriodicTaskMessageHandler(new
RunPeriodicTaskMessage(message), notificationContext,
+ _periodicTaskScheduler);
+ }
+
+ // Log a warning and return no-op message handler for unsupported message
sub-types. This can happen when
+ // a new message sub-type is added, and the sender gets deployed first
while receiver is still running the
+ // old version.
+ LOGGER.warn("Received message with unsupported sub-type: {}, using no-op
message handler", messageType);
+ return new NoOpMessageHandler(message, notificationContext);
+ }
+
+ @Override
+ public String getMessageType() {
+ return USER_DEFINED_MSG_STRING;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ /** Message handler for {@link RunPeriodicTaskMessage} message. */
+ private static class RunPeriodicTaskMessageHandler extends MessageHandler {
+ private final String _periodicTaskRequestId;
+ private final String _periodicTaskName;
+ private final String _tableNameWithType;
+ private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+ RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message,
NotificationContext context,
+ PeriodicTaskScheduler periodicTaskScheduler) {
+ super(message, context);
+ _periodicTaskRequestId = message.getPeriodicTaskRequestId();
+ _periodicTaskName = message.getPeriodicTaskName();
+ _tableNameWithType = message.getTableNameWithType();
+ _periodicTaskScheduler = periodicTaskScheduler;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ throws InterruptedException {
+ LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by
executing task {}", _periodicTaskRequestId,
+ _periodicTaskName);
+ _periodicTaskScheduler
+ .scheduleNow(_periodicTaskName,
createTaskProperties(_periodicTaskRequestId, _tableNameWithType));
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ helixTaskResult.setSuccess(true);
+ return helixTaskResult;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode errorCode, ErrorType errorType)
{
+ LOGGER.error("[TaskRequestId: {}] Message handling error.",
_periodicTaskRequestId, e);
+ }
+
+ private static Properties createTaskProperties(String
periodicTaskRequestId, String tableNameWithType) {
+ Properties periodicTaskParameters = new Properties();
+ if (periodicTaskRequestId != null) {
+
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID,
periodicTaskRequestId);
+ }
+
+ if (tableNameWithType != null) {
+
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME,
tableNameWithType);
+ }
+
+ return periodicTaskParameters;
+ }
+ }
+
+ /** Message handler for unknown messages */
+ private static class NoOpMessageHandler extends MessageHandler {
+ NoOpMessageHandler(Message message, NotificationContext context) {
+ super(message, context);
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ LOGGER.error("Got error for no-op message handling (error code: {},
error type: {})", code, type, e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 099651a..3e9b33c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -45,6 +45,7 @@ public class Constants {
public static final String TABLE_NAME = "tableName";
public static final String ZOOKEEPER = "Zookeeper";
public static final String APP_CONFIGS = "AppConfigs";
+ public static final String PERIODIC_TASK_TAG = "PeriodicTask";
public static TableType validateTableType(String tableTypeStr) {
if (tableTypeStr == null || tableTypeStr.isEmpty()) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
new file mode 100644
index 0000000..99a597b
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+ private static final String API_REQUEST_ID_PREFIX = "api-";
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @Inject
+ PeriodicTaskScheduler _periodicTaskScheduler;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/run")
+ @ApiOperation(value = "Run periodic task against table. If table name is
missing, task will run against all tables.")
+ public String runPeriodicTask(
+ @ApiParam(value = "Periodic task name", required = true)
@QueryParam("taskname") String periodicTaskName,
+ @ApiParam(value = "Name of the table") @QueryParam("tableName") String
tableName,
+ @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String
tableType) {
+
+ if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+ throw new WebApplicationException("Periodic task '" + periodicTaskName +
"' not found.",
+ Response.Status.NOT_FOUND);
+ }
+
+ if (tableName != null) {
+ tableName = tableName.trim();
+ List<String> matchingTableNamesWithType = ResourceUtils
+ .getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, Constants.validateTableType(tableType),
+ LOGGER);
+
+ if (matchingTableNamesWithType.size() > 1) {
+ throw new WebApplicationException(
+ "More than one table matches Table '" + tableName + "'. Matching
names: " + matchingTableNamesWithType
+ .toString());
+ }
+
+ tableName = matchingTableNamesWithType.get(0);
+ }
+
+ // Generate an id for this request by taking first eight characters of a
randomly generated UUID. This request id
+ // is returned to the user and also appended to log messages so that user
can locate all log messages associated
+ // with this PeriodicTask's execution.
+ String periodicTaskRequestId = API_REQUEST_ID_PREFIX +
UUID.randomUUID().toString().substring(0, 8);
+
+ LOGGER.info(
+ "[TaskRequestId: {}] Sending periodic task message to all controllers
for running task {} against {}.",
+ periodicTaskRequestId, periodicTaskName, tableName != null ? " table
'" + tableName + "'" : "all tables");
+
+ // Create and send message to send to all controllers (including this one)
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setSessionSpecific(true);
+
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ recipientCriteria.setSelfExcluded(false);
+ RunPeriodicTaskMessage runPeriodicTaskMessage =
+ new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName,
tableName);
+
+ ClusterMessagingService clusterMessagingService =
+ _pinotHelixResourceManager.getHelixZkManager().getMessagingService();
+ int messageCount = clusterMessagingService.send(recipientCriteria,
runPeriodicTaskMessage, null, -1);
+ LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to
{} controllers.", periodicTaskRequestId,
+ messageCount);
+
+ return "{\"Log Request Id\": \"" + periodicTaskRequestId +
"\",\"Controllers notified\":" + (messageCount > 0)
+ + "}";
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/names")
+ @ApiOperation(value = "Get comma-delimited list of all available periodic
task names.")
+ public List<String> getPeriodicTaskNames() {
+ return _periodicTaskScheduler.getTaskNames();
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
index a4b6baf..0f14924 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.minion;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
@@ -55,7 +56,7 @@ public class MinionInstancesCleanupTask extends
BasePeriodicTask {
}
@Override
- protected void runTask() {
+ protected void runTask(Properties periodicTaskProperties) {
// Make it so that only one controller is responsible for cleaning up
minion instances.
if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
return;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
index 64e7303..d9dea38 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.minion;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -59,7 +60,7 @@ public class TaskMetricsEmitter extends BasePeriodicTask {
}
@Override
- protected final void runTask() {
+ protected final void runTask(Properties periodicTaskProperties) {
// Make it so that only one controller returns the metric for all the
tasks.
if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
return;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 5aff79c..9d7a676 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.periodictask;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -27,6 +28,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,17 +57,31 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
}
@Override
- protected final void runTask() {
+ protected final void runTask(Properties periodicTaskProperties) {
_controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
try {
+ // Check if we have a specific table against which this task needs to be
run.
+ String propTableNameWithType = (String)
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+
// Process the tables that are managed by this controller
List<String> tablesToProcess = new ArrayList<>();
- for (String tableNameWithType :
_pinotHelixResourceManager.getAllTables()) {
- if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
- tablesToProcess.add(tableNameWithType);
+ if (propTableNameWithType == null) {
+ // Table name is not available, so task should run on all tables for
which this controller is the lead.
+ for (String tableNameWithType :
_pinotHelixResourceManager.getAllTables()) {
+ if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+ tablesToProcess.add(tableNameWithType);
+ }
+ }
+ } else {
+ // Table name is available, so task should run only on the specified
table.
+ if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
+ tablesToProcess.add(propTableNameWithType);
}
}
- processTables(tablesToProcess);
+
+ if (!tablesToProcess.isEmpty()) {
+ processTables(tablesToProcess);
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while running task: {}", _taskName, e);
_controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
index 363c795..d6587af 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import java.util.Properties;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.controller.helix.ControllerTest;
import org.testng.Assert;
@@ -38,22 +39,22 @@ public class MinionInstancesCleanupTaskTest extends
ControllerTest {
public void testMinionInstancesCleanupTask()
throws Exception {
MinionInstancesCleanupTask minionInstancesCleanupTask =
_controllerStarter.getMinionInstancesCleanupTask();
- minionInstancesCleanupTask.runTask();
+ minionInstancesCleanupTask.runTask(new Properties());
Assert.assertEquals(
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
addFakeMinionInstancesToAutoJoinHelixCluster(3);
Assert.assertEquals(
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
0);
stopFakeInstance("Minion_localhost_0");
- minionInstancesCleanupTask.runTask();
+ minionInstancesCleanupTask.runTask(new Properties());
Assert.assertEquals(
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
1);
stopFakeInstance("Minion_localhost_1");
- minionInstancesCleanupTask.runTask();
+ minionInstancesCleanupTask.runTask(new Properties());
Assert.assertEquals(
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
2);
stopFakeInstance("Minion_localhost_2");
- minionInstancesCleanupTask.runTask();
+ minionInstancesCleanupTask.runTask(new Properties());
Assert.assertEquals(
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
3);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index eb15aa9..f7a14de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.periodictask;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +32,7 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BasePeriodicTask implements PeriodicTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(BasePeriodicTask.class);
+ private static final String DEFAULT_REQUEST_ID = "auto";
// Wait for at most 30 seconds while calling stop() for task to terminate
private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
@@ -36,14 +40,26 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
protected final String _taskName;
protected final long _intervalInSeconds;
protected final long _initialDelayInSeconds;
+ protected final ReentrantLock _runLock;
private volatile boolean _started;
private volatile boolean _running;
+ // Default properties that tasks may use during execution. This variable is
private and does not have any get or set
+ // methods to prevent subclasses from gaining direct access to this
variable. See run(Properties) method to see how
+ // properties are passed and used during task execution.
+ private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES;
+ static {
+ // Default properties for PeriodicTask execution.
+ DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties();
+ DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID,
DEFAULT_REQUEST_ID);
+ }
+
public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long
initialDelayInSeconds) {
_taskName = taskName;
_intervalInSeconds = runFrequencyInSeconds;
_initialDelayInSeconds = initialDelayInSeconds;
+ _runLock = new ReentrantLock();
}
@Override
@@ -87,13 +103,16 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
LOGGER.warn("Task: {} is already started", _taskName);
return;
}
- _started = true;
try {
setUpTask();
} catch (Exception e) {
LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
}
+
+ // mark _started as true only after state has completely initialized, so
that run method doesn't end up seeing
+ // partially initialized state.
+ _started = true;
}
/**
@@ -111,29 +130,41 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
*/
@Override
public final void run() {
- _running = true;
+ // Pass default properties object to the actual run method.
+ run(DEFAULT_PERIODIC_TASK_PROPERTIES);
+ }
- if (_started) {
- long startTime = System.currentTimeMillis();
- LOGGER.info("Start running task: {}", _taskName);
- try {
- runTask();
- } catch (Exception e) {
- LOGGER.error("Caught exception while running task: {}", _taskName, e);
+ @Override
+ public final void run(Properties periodicTaskProperties) {
+ try {
+ // Don't allow a task to run more than once at a time.
+ _runLock.lock();
+ _running = true;
+
+ String periodicTaskRequestId =
periodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID);
+ if (_started) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("[TaskRequestId: {}] Start running task: {}",
periodicTaskRequestId, _taskName);
+ try {
+ runTask(periodicTaskProperties);
+ } catch (Exception e) {
+ LOGGER.error("[TaskRequestId: {}] Caught exception while running
task: {}", periodicTaskRequestId, _taskName, e);
+ }
+ LOGGER.info("[TaskRequestId: {}] Finish running task: {} in {}ms",
periodicTaskRequestId, _taskName, System.currentTimeMillis() - startTime);
+ } else {
+ LOGGER.warn("[TaskRequestId: {}] Task: {} is skipped because it is not
started or already stopped", periodicTaskRequestId, _taskName);
}
- LOGGER.info("Finish running task: {} in {}ms", _taskName,
System.currentTimeMillis() - startTime);
- } else {
- LOGGER.warn("Task: {} is skipped because it is not started or already
stopped", _taskName);
+ } finally {
+ _runLock.unlock();
+ _running = false;
}
-
- _running = false;
}
/**
* Executes the task. This method should early terminate if {@code started}
flag is set to false by {@link #stop()}
* during execution.
*/
- protected abstract void runTask();
+ protected abstract void runTask(Properties periodicTaskProperties);
/**
* {@inheritDoc}
@@ -147,28 +178,22 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
LOGGER.warn("Task: {} is not started", _taskName);
return;
}
+ long startTimeMs = System.currentTimeMillis();
_started = false;
- if (_running) {
- long startTimeMs = System.currentTimeMillis();
- long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
- LOGGER.info("Task: {} is running, wait for at most {}ms for it to
finish", _taskName, remainingTimeMs);
- while (_running && remainingTimeMs > 0L) {
- long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
- remainingTimeMs -= sleepTimeMs;
- try {
- Thread.sleep(sleepTimeMs);
- } catch (InterruptedException e) {
- LOGGER.error("Caught InterruptedException while waiting for task: {}
to finish", _taskName);
- Thread.currentThread().interrupt();
- break;
- }
- }
- long waitTimeMs = System.currentTimeMillis() - startTimeMs;
- if (_running) {
- LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+ try {
+ // check if task is done running, or wait for the task to get done, by
trying to acquire runLock.
+ if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS,
TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Task {} could not be stopped within timeout of {}ms",
_taskName, MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
} else {
- LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+ LOGGER.info("Task {} successfully stopped in {}ms", _taskName,
System.currentTimeMillis() - startTimeMs);
+ }
+ } catch (InterruptedException ie) {
+ LOGGER.error("Caught InterruptedException while waiting for task: {} to
finish", _taskName);
+ Thread.currentThread().interrupt();
+ } finally {
+ if (_runLock.isHeldByCurrentThread()) {
+ _runLock.unlock();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index ee6c68b..4a5e120 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.periodictask;
+import java.util.Properties;
import javax.annotation.concurrent.ThreadSafe;
@@ -28,6 +29,10 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface PeriodicTask extends Runnable {
+ // PeriodicTask objects may take a {@link Properties} object. Define all the
keys property keys here.
+ String PROPERTY_KEY_REQUEST_ID = "requestId";
+ String PROPERTY_KEY_TABLE_NAME = "tableNameWithType";
+
/**
* Returns the periodic task name.
* @return task name.
@@ -60,6 +65,12 @@ public interface PeriodicTask extends Runnable {
void run();
/**
+ * Execute the task with specified {@link Properties}.
+ * @param periodicTaskProperties Properties used by {@link PeriodicTask}
during execution.
+ */
+ void run(Properties periodicTaskProperties);
+
+ /**
* Stops the periodic task and performs necessary cleanups. Should be called
after removing the periodic task from the
* scheduler. Should be called after {@link #start()} getting called.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index ffbc34a..8e42adf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.periodictask;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -28,7 +29,8 @@ import org.slf4j.LoggerFactory;
/**
- * Periodic task scheduler will schedule a list of tasks based on their
initial delay time and interval time.
+ * Periodic task scheduler will schedule a list of tasks based on their
initial delay time and interval time. Tasks
+ * can also scheduled of immediate execution by calling the scheduleNow()
method.
*/
public class PeriodicTaskScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
@@ -109,4 +111,62 @@ public class PeriodicTaskScheduler {
_tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
}
}
+
+ /** @return true if task with given name exists; otherwise, false. */
+ public boolean hasTask(String periodicTaskName) {
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ if (task.getTaskName().equals(periodicTaskName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** @return List of tasks name that will run periodically. */
+ public List<String> getTaskNames() {
+ List<String> taskNameList = new ArrayList<>();
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ taskNameList.add(task.getTaskName());
+ }
+ return taskNameList;
+ }
+
+ private PeriodicTask getPeriodicTask(String periodicTaskName) {
+ for (PeriodicTask task : _tasksWithValidInterval) {
+ if (task.getTaskName().equals(periodicTaskName)) {
+ return task;
+ }
+ }
+ return null;
+ }
+
+ /** Execute {@link PeriodicTask} immediately on the specified table. */
+ public void scheduleNow(String periodicTaskName, Properties
periodicTaskProperties) {
+ // During controller deployment, each controller can have a slightly
different list of periodic tasks if we add,
+ // remove, or rename periodic task. To avoid this situation, we check
again (besides the check at controller API
+ // level) whether the periodic task exists.
+ PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+ if (periodicTask == null) {
+ LOGGER.error("Unknown Periodic Task " + periodicTaskName);
+ return;
+ }
+
+ String taskRequestId =
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_REQUEST_ID).toString();
+ LOGGER.info(
+ "[TaskRequestId: {}] Schedule task '{}' to run immediately. If the
task is already running, this run will wait until the current run finishes.",
+ taskRequestId, periodicTaskName);
+ _executorService.schedule(() -> {
+ try {
+ // Run the periodic task using the specified parameters. The call to
run() method will block if another thread
+ // (the periodic execution thread or another thread calling this
method) is already in the process of
+ // running the same task.
+ periodicTask.run(periodicTaskProperties);
+ } catch (Throwable t) {
+ // catch all errors to prevent subsequent executions from being
silently suppressed
+ // Ref:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+ LOGGER.error("[TaskRequestId: {}] Caught exception while attempting to
execute named periodic task: {}",
+ taskRequestId, periodicTask.getTaskName(), t);
+ }
+ }, 0, TimeUnit.SECONDS);
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a4d581d..a033e06 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -21,8 +21,11 @@ package org.apache.pinot.core.periodictask;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -45,7 +48,7 @@ public class PeriodicTaskSchedulerTest {
}
@Override
- protected void runTask() {
+ protected void runTask(Properties periodicTaskProperties) {
runCalled.set(true);
}
@@ -83,7 +86,7 @@ public class PeriodicTaskSchedulerTest {
}
@Override
- protected void runTask() {
+ protected void runTask(Properties periodicTaskProperties) {
numTimesRunCalled.getAndIncrement();
}
@@ -104,4 +107,82 @@ public class PeriodicTaskSchedulerTest {
assertEquals(numTimesRunCalled.get(), numTasks * 2);
assertEquals(numTimesStopCalled.get(), numTasks);
}
+
+ /**
+ * Test that {@link PeriodicTaskScheduler} is thread safe and does not run
the same task more than once at any time.
+ * This is done by attempting to run the same task object in 20 different
threads at the same time. While the test
+ * case launches 20 threads to keep {@link PeriodicTaskScheduler} busy, it
waits for only around half of them to
+ * complete. The test case then checks whether the threads that did not
complete execution were waiting to execute
+ * (i.e they had requested execution, but had not executed yet). This
"waiting" indicates that task execution was
+ * being properly synchronized (otherwise all the tasks would have just run
immediately). 'isRunning' variable within
+ * the task is used to check that the task is not executing more than once
at any given time.
+ */
+ @Test
+ public void testConcurrentExecutionOfSameTask() throws Exception {
+ // Number of threads to run
+ final int numThreads = 20;
+
+ // Count number of threads that requested execution.
+ final AtomicInteger attempts = new AtomicInteger();
+
+ // Countdown latch to ensure that this test case will wait only for around
half the tasks to complete.
+ final CountDownLatch countDownLatch = new CountDownLatch(numThreads/2);
+
+ // Create periodic task.
+ PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) {
+ private volatile boolean isRunning = false;
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ try {
+ if (isRunning) {
+ // fail since task is already running in another thread.
+ Assert.fail("More than one thread attempting to execute task at
the same time.");
+ }
+ isRunning = true;
+ Thread.sleep(200);
+ countDownLatch.countDown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ isRunning = false;
+ }
+ }
+ };
+
+ // Start scheduler with periodic task.
+ List<PeriodicTask> periodicTasks = new ArrayList<>();
+ periodicTasks.add(task);
+
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.init(periodicTasks);
+ taskScheduler.start();
+
+ // Create multiple "execute" threads that try to run the same task that is
already being run by scheduler
+ // on a periodic basis.
+ Thread[] threads = new Thread[numThreads];
+ Properties taskProperties = new Properties();
+ taskProperties.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID,
getClass().getSimpleName());
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ attempts.incrementAndGet();
+ taskScheduler.scheduleNow("TestTask", taskProperties);
+ });
+
+ threads[i].start();
+ try {
+ threads[i].join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // Wait for around half the threads to finish running.
+ countDownLatch.await();
+
+ // stop task scheduler.
+ taskScheduler.stop();
+
+ // Confirm that all threads requested execution, even though only half the
threads completed execution.
+ Assert.assertEquals(attempts.get(), numThreads);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]