This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 75907372b0 Added validation for minionInstanceTag while updating / 
creating table-config (#14228)
75907372b0 is described below

commit 75907372b01573128fecb8c412169eaec6df760b
Author: tarun11Mavani <[email protected]>
AuthorDate: Mon Oct 21 21:37:37 2024 +0530

    Added validation for minionInstanceTag while updating / creating 
table-config (#14228)
    
    * Adding minion task config validation
    
    * Added UTs
    
    * Fixed failing UTs
    
    * Fixed failing UTs
    
    * Fixed failing UTs
    
    * validating the error message
---
 .../helix/core/PinotHelixResourceManager.java      | 29 +++++++++
 .../api/PinotInstanceRestletResourceTest.java      |  3 +-
 .../pinot/controller/helix/ControllerTest.java     |  2 +
 .../PinotHelixResourceManagerStatelessTest.java    | 76 +++++++++++++++++++++-
 .../core/minion/PinotTaskManagerStatelessTest.java |  2 +
 .../MergeRollupMinionClusterIntegrationTest.java   |  2 +-
 .../tests/PurgeMinionClusterIntegrationTest.java   |  2 +-
 7 files changed, 110 insertions(+), 6 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1b9f4d7164..1b4b722a7e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -146,6 +146,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
@@ -1689,6 +1690,11 @@ public class PinotHelixResourceManager {
     LOGGER.info("Adding table {}: Validate table configs", tableNameWithType);
     validateTableTenantConfig(tableConfig);
 
+    LOGGER.info("Adding table {}: Validate table task minion instance 
configs", tableNameWithType);
+    validateTableTaskMinionInstanceTagConfig(tableConfig);
+
+    LOGGER.info("Adding table {}: Successfully validated added table", 
tableNameWithType);
+
     IdealState idealState =
         PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
             _enableBatchMessageMode);
@@ -1815,6 +1821,28 @@ public class PinotHelixResourceManager {
     }
   }
 
+  @VisibleForTesting
+  void validateTableTaskMinionInstanceTagConfig(TableConfig tableConfig) {
+
+    List<InstanceConfig> allMinionWorkerInstanceConfigs = 
getAllMinionInstanceConfigs();
+
+    //extract all minionInstanceTags from allMinionWorkerInstanceConfigs
+    Set<String> minionInstanceTagSet = 
allMinionWorkerInstanceConfigs.stream().map(InstanceConfig::getTags)
+        .collect(HashSet::new, Set::addAll, Set::addAll);
+
+    if (tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig().getTaskTypeConfigsMap() != null) {
+      tableConfig.getTaskConfig().getTaskTypeConfigsMap().forEach((taskType, 
taskTypeConfig) -> {
+        String taskInstanceTag = 
taskTypeConfig.getOrDefault(PinotTaskManager.MINION_INSTANCE_TAG_CONFIG,
+            CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+        if (!minionInstanceTagSet.contains(taskInstanceTag)) {
+          throw new InvalidTableConfigException(
+              String.format("Failed to find minion instances with tag: %s for 
table: %s", taskInstanceTag,
+                  tableConfig.getTableName()));
+        }
+      });
+    }
+  }
+
   public boolean setZKData(String path, ZNRecord record, int expectedVersion, 
int accessOption) {
     return _helixDataAccessor.getBaseDataAccessor().set(path, record, 
expectedVersion, accessOption);
   }
@@ -1944,6 +1972,7 @@ public class PinotHelixResourceManager {
   public void updateTableConfig(TableConfig tableConfig)
       throws IOException {
     validateTableTenantConfig(tableConfig);
+    validateTableTaskMinionInstanceTagConfig(tableConfig);
     setExistingTableConfig(tableConfig);
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 93a142f221..9b7bad09ed 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -70,7 +70,8 @@ public class PinotInstanceRestletResourceTest extends 
ControllerTest {
   public void testInstanceListingAndCreation()
       throws Exception {
     String listInstancesUrl = _urlBuilder.forInstanceList();
-    int expectedNumInstances = 1 + DEFAULT_NUM_BROKER_INSTANCES + 
DEFAULT_NUM_SERVER_INSTANCES;
+    int expectedNumInstances =
+        1 + DEFAULT_NUM_BROKER_INSTANCES + DEFAULT_NUM_SERVER_INSTANCES + 
DEFAULT_NUM_MINION_INSTANCES;
     checkNumInstances(listInstancesUrl, expectedNumInstances);
 
     // Create untagged broker and server instances
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index fa5b8a0800..c0a3230e85 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -101,6 +101,7 @@ public class ControllerTest {
   public static final int DEFAULT_NUM_BROKER_INSTANCES = 3;
   // NOTE: To add HLC realtime table, number of Server instances must be 
multiple of replicas
   public static final int DEFAULT_NUM_SERVER_INSTANCES = 4;
+  public static final int DEFAULT_NUM_MINION_INSTANCES = 2;
 
   public static final long TIMEOUT_MS = 10_000L;
 
@@ -994,6 +995,7 @@ public class ControllerTest {
 
     
addMoreFakeBrokerInstancesToAutoJoinHelixCluster(DEFAULT_NUM_BROKER_INSTANCES, 
true);
     
addMoreFakeServerInstancesToAutoJoinHelixCluster(DEFAULT_NUM_SERVER_INSTANCES, 
true);
+    addFakeMinionInstancesToAutoJoinHelixCluster(DEFAULT_NUM_MINION_INSTANCES);
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index feedaa8c32..902389bef4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core;
 
 import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -58,10 +59,12 @@ import 
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.resources.InstanceInfo;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
@@ -662,6 +665,73 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     resetServerTags();
   }
 
+  @Test
+  public void testValidateTableTaskMinionInstanceTagConfig() {
+    TableConfig realtimeTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
+
+    Map<String, String> segmentGenerationAndPushTaskConfig =
+        Map.of("invalidRecordsThresholdCount", "1", "schedule", "0 */30 * ? * 
*", "tableMaxNumTasks", "1",
+            "validDocIdsType", "SNAPSHOT");
+
+    Map<String, String> upsertCompactionTask =
+        Map.of("invalidRecordsThresholdCount", "1", "schedule", "0 */30 * ? * 
*", "tableMaxNumTasks", "1",
+            "validDocIdsType", "SNAPSHOT", "minionInstanceTag", 
"minionTenant");
+
+    Map<String, String> segmentGenerationAndPushTaskConfig2 =
+        Map.of("schedule", "0 */30 * ? * *", "tableMaxNumTasks", "1", 
"validDocIdsType", "SNAPSHOT",
+            "minionInstanceTag", "anotherMinionTenant");
+
+    // Minion instance tag set but no minion present
+    realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+        
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
upsertCompactionTask)));
+
+    assertThrows(InvalidTableConfigException.class, () -> {
+      try {
+        
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+      } catch (InvalidTableConfigException e) {
+        assertEquals(e.getMessage(),
+            "Failed to find minion instances with tag: minionTenant for table: 
testTable_REALTIME");
+        throw e;
+      }
+    });
+
+    // Valid minion instance tag with instances
+    addMinionInstance();
+    
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+
+    //Untag minion instance
+    untagMinions();
+    realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+        
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
segmentGenerationAndPushTaskConfig)));
+    
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+
+    realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+        
ImmutableMap.of(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
segmentGenerationAndPushTaskConfig2)));
+    assertThrows(InvalidTableConfigException.class, () -> {
+      try {
+        
_helixResourceManager.validateTableTaskMinionInstanceTagConfig(realtimeTableConfig);
+      } catch (InvalidTableConfigException e) {
+        assertEquals(e.getMessage(),
+            "Failed to find minion instances with tag: anotherMinionTenant for 
table: testTable_REALTIME");
+        throw e;
+      }
+    });
+  }
+
+  private void untagMinions() {
+    for (InstanceConfig minionInstance : 
_helixResourceManager.getAllMinionInstanceConfigs()) {
+      _helixResourceManager.updateInstanceTags(minionInstance.getId(), 
Helix.UNTAGGED_MINION_INSTANCE, false);
+    }
+  }
+
+  private void addMinionInstance() {
+    untagMinions();
+    Instance minionTenant =
+        new Instance("2.3.4.5", 2345, InstanceType.MINION, 
Collections.singletonList("minionTenant"), null, 0, 0, 0, 0,
+            false);
+    _helixResourceManager.addInstance(minionTenant, false);
+  }
+
   @Test
   public void testCreateColocatedTenant() {
     untagServers();
@@ -708,8 +778,9 @@ public class PinotHelixResourceManagerStatelessTest extends 
ControllerTest {
       throws Exception {
     // Create an instance with no tags
     String serverName = "Server_localhost_" + NUM_SERVER_INSTANCES;
-    Instance instance = new Instance("localhost", NUM_SERVER_INSTANCES, 
InstanceType.SERVER,
-        Collections.emptyList(), null, 0, 12345, 0, 0, false);
+    Instance instance =
+        new Instance("localhost", NUM_SERVER_INSTANCES, InstanceType.SERVER, 
Collections.emptyList(), null, 0, 12345, 0,
+            0, false);
     _helixResourceManager.addInstance(instance, false);
     addFakeServerInstanceToAutoJoinHelixClusterWithEmptyTag(serverName, false);
 
@@ -725,7 +796,6 @@ public class PinotHelixResourceManagerStatelessTest extends 
ControllerTest {
     // Takes care of the negative case
     assertFalse(untaggedServers.contains(SERVER_NAME_TAGGED), "Server with 
tags should not be considered untagged");
 
-
     stopAndDropFakeInstance(serverName);
 
     allInstances = _helixResourceManager.getAllInstances();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index 6c63fd3a5a..4abc8bdaa6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -117,6 +117,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     startController(properties);
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeMinionInstancesToAutoJoinHelixCluster(1);
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
         .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
@@ -183,6 +184,7 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
     startController(properties);
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeMinionInstancesToAutoJoinHelixCluster(1);
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
         .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 0b7dda70a4..3ba0d654fd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -107,6 +107,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     startController();
     startBroker();
     startServer();
+    startMinion();
     // Start Kafka
     startKafka();
 
@@ -183,7 +184,6 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Initialize the query generator
     setUpQueryGenerator(avroFiles);
 
-    startMinion();
     _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
     _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index e53045240f..840e0c3eee 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -80,6 +80,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     startController();
     startBroker();
     startServer();
+    startMinion();
 
     List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, 
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
         PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
@@ -110,7 +111,6 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
       uploadSegments(tableName, _segmentTarDir);
     }
 
-    startMinion();
     setRecordPurger();
     _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to