This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 75907372b0 Added validation for minionInstanceTag while updating /
creating table-config (#14228)
75907372b0 is described below
commit 75907372b01573128fecb8c412169eaec6df760b
Author: tarun11Mavani <[email protected]>
AuthorDate: Mon Oct 21 21:37:37 2024 +0530
Added validation for minionInstanceTag while updating / creating
table-config (#14228)
* Adding minion task config validation
* Added UTs
* Fixed failing UTs
* Fixed failing UTs
* Fixed failing UTs
* validating the error message
---
.../helix/core/PinotHelixResourceManager.java | 29 +++++++++
.../api/PinotInstanceRestletResourceTest.java | 3 +-
.../pinot/controller/helix/ControllerTest.java | 2 +
.../PinotHelixResourceManagerStatelessTest.java | 76 +++++++++++++++++++++-
.../core/minion/PinotTaskManagerStatelessTest.java | 2 +
.../MergeRollupMinionClusterIntegrationTest.java | 2 +-
.../tests/PurgeMinionClusterIntegrationTest.java | 2 +-
7 files changed, 110 insertions(+), 6 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1b9f4d7164..1b4b722a7e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -146,6 +146,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
@@ -1689,6 +1690,11 @@ public class PinotHelixResourceManager {
LOGGER.info("Adding table {}: Validate table configs", tableNameWithType);
validateTableTenantConfig(tableConfig);
+ LOGGER.info("Adding table {}: Validate table task minion instance
configs", tableNameWithType);
+ validateTableTaskMinionInstanceTagConfig(tableConfig);
+
+ LOGGER.info("Adding table {}: Successfully validated added table",
tableNameWithType);
+
IdealState idealState =
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
tableConfig.getReplication(),
_enableBatchMessageMode);
@@ -1815,6 +1821,28 @@ public class PinotHelixResourceManager {
}
}
+ @VisibleForTesting
+ void validateTableTaskMinionInstanceTagConfig(TableConfig tableConfig) {
+
+ List<InstanceConfig> allMinionWorkerInstanceConfigs =
getAllMinionInstanceConfigs();
+
+ //extract all minionInstanceTags from allMinionWorkerInstanceConfigs
+ Set<String> minionInstanceTagSet =
allMinionWorkerInstanceConfigs.stream().map(InstanceConfig::getTags)
+ .collect(HashSet::new, Set::addAll, Set::addAll);
+
+ if (tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig().getTaskTypeConfigsMap() != null) {
+ tableConfig.getTaskConfig().getTaskTypeConfigsMap().forEach((taskType,
taskTypeConfig) -> {
+ String taskInstanceTag =
taskTypeConfig.getOrDefault(PinotTaskManager.MINION_INSTANCE_TAG_CONFIG,
+ CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+ if (!minionInstanceTagSet.contains(taskInstanceTag)) {
+ throw new InvalidTableConfigException(
+ String.format("Failed to find minion instances with tag: %s for
table: %s", taskInstanceTag,
+ tableConfig.getTableName()));
+ }
+ });
+ }
+ }
+
public boolean setZKData(String path, ZNRecord record, int expectedVersion,
int accessOption) {
return _helixDataAccessor.getBaseDataAccessor().set(path, record,
expectedVersion, accessOption);
}
@@ -1944,6 +1972,7 @@ public class PinotHelixResourceManager {
public void updateTableConfig(TableConfig tableConfig)
throws IOException {
validateTableTenantConfig(tableConfig);
+ validateTableTaskMinionInstanceTagConfig(tableConfig);
setExistingTableConfig(tableConfig);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 93a142f221..9b7bad09ed 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -70,7 +70,8 @@ public class PinotInstanceRestletResourceTest extends
ControllerTest {
public void testInstanceListingAndCreation()
throws Exception {
String listInstancesUrl = _urlBuilder.forInstanceList();
- int expectedNumInstances = 1 + DEFAULT_NUM_BROKER_INSTANCES +
DEFAULT_NUM_SERVER_INSTANCES;
+ int expectedNumInstances =
+ 1 + DEFAULT_NUM_BROKER_INSTANCES + DEFAULT_NUM_SERVER_INSTANCES +
DEFAULT_NUM_MINION_INSTANCES;
checkNumInstances(listInstancesUrl, expectedNumInstances);
// Create untagged broker and server instances
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index fa5b8a0800..c0a3230e85 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -101,6 +101,7 @@ public class ControllerTest {
public static final int DEFAULT_NUM_BROKER_INSTANCES = 3;
// NOTE: To add HLC realtime table, number of Server instances must be
multiple of replicas
public static final int DEFAULT_NUM_SERVER_INSTANCES = 4;
+ public static final int DEFAULT_NUM_MINION_INSTANCES = 2;
public static final long TIMEOUT_MS = 10_000L;
@@ -994,6 +995,7 @@ public class ControllerTest {
addMoreFakeBrokerInstancesToAutoJoinHelixCluster(DEFAULT_NUM_BROKER_INSTANCES,
true);
addMoreFakeServerInstancesToAutoJoinHelixCluster(DEFAULT_NUM_SERVER_INSTANCES,
true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(DEFAULT_NUM_MINION_INSTANCES);
}
/**
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index feedaa8c32..902389bef4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core;
import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -58,10 +59,12 @@ import
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.resources.InstanceInfo;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
@@ -662,6 +665,73 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
resetServerTags();
}
+ @Test
+ public void testValidateTableTaskMinionInstanceTagConfig() {
+ TableConfig realtimeTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
+
+ Map<String, String> segmentGenerationAndPushTaskConfig =
+ Map.of("invalidRecordsThresholdCount", "1", "schedule", "0 */30 * ? *
*", "tableMaxNumTasks", "1",
+ "validDocIdsType", "SNAPSHOT");
+
+ Map<String, String> upsertCompactionTask =
+ Map.of("invalidRecordsThresholdCount", "1", "schedule", "0 */30 * ? *
*", "tableMaxNumTasks", "1",
+ "validDocIdsType", "SNAPSHOT", "minionInstanceTag",
"minionTenant");
+
+ Map<String, String> segmentGenerationAndPushTaskConfig2 =
+ Map.of("schedule", "0 */30 * ? * *", "tableMaxNumTasks", "1",
"validDocIdsType", "SNAPSHOT",
+ "minionInstanceTag", "anotherMinionTenant");
+
+ // Minion instance tag set but no minion present
+ realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
upsertCompactionTask)));
+
+ assertThrows(InvalidTableConfigException.class, () -> {
+ try {
+
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+ } catch (InvalidTableConfigException e) {
+ assertEquals(e.getMessage(),
+ "Failed to find minion instances with tag: minionTenant for table:
testTable_REALTIME");
+ throw e;
+ }
+ });
+
+ // Valid minion instance tag with instances
+ addMinionInstance();
+
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+
+ //Untag minion instance
+ untagMinions();
+ realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
segmentGenerationAndPushTaskConfig)));
+
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+
+ realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
segmentGenerationAndPushTaskConfig2)));
+ assertThrows(InvalidTableConfigException.class, () -> {
+ try {
+
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+ } catch (InvalidTableConfigException e) {
+ assertEquals(e.getMessage(),
+ "Failed to find minion instances with tag: anotherMinionTenant for
table: testTable_REALTIME");
+ throw e;
+ }
+ });
+ }
+
+ private void untagMinions() {
+ for (InstanceConfig minionInstance :
_helixResourceManager.getAllMinionInstanceConfigs()) {
+ _helixResourceManager.updateInstanceTags(minionInstance.getId(),
Helix.UNTAGGED_MINION_INSTANCE, false);
+ }
+ }
+
+ private void addMinionInstance() {
+ untagMinions();
+ Instance minionTenant =
+ new Instance("2.3.4.5", 2345, InstanceType.MINION,
Collections.singletonList("minionTenant"), null, 0, 0, 0, 0,
+ false);
+ _helixResourceManager.addInstance(minionTenant, false);
+ }
+
@Test
public void testCreateColocatedTenant() {
untagServers();
@@ -708,8 +778,9 @@ public class PinotHelixResourceManagerStatelessTest extends
ControllerTest {
throws Exception {
// Create an instance with no tags
String serverName = "Server_localhost_" + NUM_SERVER_INSTANCES;
- Instance instance = new Instance("localhost", NUM_SERVER_INSTANCES,
InstanceType.SERVER,
- Collections.emptyList(), null, 0, 12345, 0, 0, false);
+ Instance instance =
+ new Instance("localhost", NUM_SERVER_INSTANCES, InstanceType.SERVER,
Collections.emptyList(), null, 0, 12345, 0,
+ 0, false);
_helixResourceManager.addInstance(instance, false);
addFakeServerInstanceToAutoJoinHelixClusterWithEmptyTag(serverName, false);
@@ -725,7 +796,6 @@ public class PinotHelixResourceManagerStatelessTest extends
ControllerTest {
// Takes care of the negative case
assertFalse(untaggedServers.contains(SERVER_NAME_TAGGED), "Server with
tags should not be considered untagged");
-
stopAndDropFakeInstance(serverName);
allInstances = _helixResourceManager.getAllInstances();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index 6c63fd3a5a..4abc8bdaa6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -117,6 +117,7 @@ public class PinotTaskManagerStatelessTest extends
ControllerTest {
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
@@ -183,6 +184,7 @@ public class PinotTaskManagerStatelessTest extends
ControllerTest {
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 0b7dda70a4..3ba0d654fd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -107,6 +107,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
startController();
startBroker();
startServer();
+ startMinion();
// Start Kafka
startKafka();
@@ -183,7 +184,6 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Initialize the query generator
setUpQueryGenerator(avroFiles);
- startMinion();
_helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
_pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index e53045240f..840e0c3eee 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -80,6 +80,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
startController();
startBroker();
startServer();
+ startMinion();
List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE,
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
@@ -110,7 +111,6 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
uploadSegments(tableName, _segmentTarDir);
}
- startMinion();
setRecordPurger();
_helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]