This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8c0ae  Add config to disable HLC realtime segment completion (#4235)
8c8c0ae is described below

commit 8c8c0ae999ab7277b90e003af9ab1a034f64025e
Author: Jialiang Li <[email protected]>
AuthorDate: Wed May 29 09:49:52 2019 -0700

    Add config to disable HLC realtime segment completion (#4235)
    
    * Add config to disable HLC realtime segment completion
---
 .../apache/pinot/controller/ControllerConf.java    | 15 ++++++--
 .../apache/pinot/controller/ControllerStarter.java | 16 ++++++---
 .../helix/core/PinotHelixResourceManager.java      | 25 +++++++++++---
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 40 ++++++++++++++++++++++
 4 files changed, 85 insertions(+), 11 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 7fc638a..0b23fdb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -32,7 +32,6 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
 import org.apache.pinot.filesystem.LocalPinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,8 +135,11 @@ public class ControllerConf extends 
PropertiesConfiguration {
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
       "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = 
"controller.enable.storage.quota.check";
-
   private static final String ENABLE_BATCH_MESSAGE_MODE = 
"controller.enable.batch.message.mode";
+  // It is used to disable the HLC realtime segment completion and disallow 
HLC table in the cluster. True by default.
+  // If it's set to false, existing HLC realtime tables will stop consumption, 
and creation of new HLC tables will be disallowed.
+  // Please make sure there is no HLC table running in the cluster before 
disallowing it.
+  private static final String ALLOW_HLC_TABLES = "controller.allow.hlc.tables";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = 
"controller.storage.factory.class";
@@ -155,6 +157,7 @@ public class ControllerConf extends PropertiesConfiguration 
{
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
+  private static final boolean DEFAULT_ALLOW_HLC_TABLES = true;
   private static final String DEFAULT_CONTROLLER_MODE = 
ControllerMode.DUAL.name();
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = 
LocalPinotFS.class.getName();
@@ -637,4 +640,12 @@ public class ControllerConf extends 
PropertiesConfiguration {
   public ControllerMode getControllerMode() {
     return ControllerMode.valueOf(getString(CONTROLLER_MODE, 
DEFAULT_CONTROLLER_MODE).toUpperCase());
   }
+
+  public boolean getHLCTablesAllowed() {
+    return getBoolean(ALLOW_HLC_TABLES, DEFAULT_ALLOW_HLC_TABLES);
+  }
+
+  public void setHLCTablesAllowed(boolean allowHLCTables) {
+    setProperty(ALLOW_HLC_TABLES, allowHLCTables);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 67a306f..36f6d20 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -274,8 +274,14 @@ public class ControllerStarter {
         new SegmentCompletionManager(helixParticipantManager, 
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
             _controllerLeadershipManager, 
_config.getSegmentCommitTimeoutSeconds());
 
-    _realtimeSegmentsManager = new 
PinotRealtimeSegmentManager(_helixResourceManager, 
_controllerLeadershipManager);
-    _realtimeSegmentsManager.start(_controllerMetrics);
+    if (_config.getHLCTablesAllowed()) {
+      LOGGER.info("Realtime tables with High Level consumers will be 
supported");
+      _realtimeSegmentsManager = new 
PinotRealtimeSegmentManager(_helixResourceManager, 
_controllerLeadershipManager);
+      _realtimeSegmentsManager.start(_controllerMetrics);
+    } else {
+      LOGGER.info("Realtime tables with High Level consumers will NOT be 
supported");
+      _realtimeSegmentsManager = null;
+    }
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
@@ -484,8 +490,10 @@ public class ControllerStarter {
       LOGGER.info("Stopping Jersey admin API");
       _adminApp.stop();
 
-      LOGGER.info("Stopping realtime segment manager");
-      _realtimeSegmentsManager.stop();
+      if (_realtimeSegmentsManager != null) {
+        LOGGER.info("Stopping realtime segment manager");
+        _realtimeSegmentsManager.stop();
+      }
 
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ef883e9..aaa9ea8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -131,6 +131,7 @@ public class PinotHelixResourceManager {
   private final long _externalViewOnlineToOfflineTimeoutMillis;
   private final boolean _isSingleTenantCluster;
   private final boolean _enableBatchMessageMode;
+  private final boolean _allowHLCTables;
 
   private HelixManager _helixZkManager;
   private HelixAdmin _helixAdmin;
@@ -145,7 +146,7 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long 
externalViewOnlineToOfflineTimeoutMillis,
-      boolean isSingleTenantCluster, boolean enableBatchMessageMode) {
+      boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean 
allowHLCTables) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _instanceId = controllerInstanceId;
@@ -153,19 +154,20 @@ public class PinotHelixResourceManager {
     _externalViewOnlineToOfflineTimeoutMillis = 
externalViewOnlineToOfflineTimeoutMillis;
     _isSingleTenantCluster = isSingleTenantCluster;
     _enableBatchMessageMode = enableBatchMessageMode;
+    _allowHLCTables = allowHLCTables;
   }
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName,
       @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
     this(zkURL, helixClusterName, controllerInstanceId, dataDir, 
DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
-        true);
+        true, true);
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
         controllerConf.getControllerHost() + "_" + 
controllerConf.getControllerPort(), controllerConf.getDataDir(),
         controllerConf.getExternalViewOnlineToOfflineTimeout(), 
controllerConf.tenantIsolationEnabled(),
-        controllerConf.getEnableBatchMessageMode());
+        controllerConf.getEnableBatchMessageMode(), 
controllerConf.getHLCTablesAllowed());
   }
 
   /**
@@ -1098,6 +1100,9 @@ public class PinotHelixResourceManager {
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
       case REALTIME:
+        IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+        verifyIndexingConfig(tableNameWithType, indexingConfig);
+
         // Ensure that realtime table is not created if schema is not present
         Schema schema =
             ZKMetadataProvider.getSchema(_propertyStore, 
TableNameBuilder.extractRawTableName(tableNameWithType));
@@ -1130,7 +1135,6 @@ public class PinotHelixResourceManager {
          * We also need to support the case when a high-level consumer already 
exists for a table and we are adding
          * the low-level consumers.
          */
-        IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
         ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, 
indexingConfig);
 
         LOGGER.info("Successfully added or updated the table {} ", 
tableNameWithType);
@@ -1270,6 +1274,14 @@ public class PinotHelixResourceManager {
     _rebalanceSegmentStrategyFactory = rebalanceSegmentStrategyFactory;
   }
 
+  private void verifyIndexingConfig(String tableNameWithType, IndexingConfig 
indexingConfig) {
+    // Check if HLC table is allowed.
+    StreamConfig streamConfig = new 
StreamConfig(indexingConfig.getStreamConfigs());
+    if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
+      throw new InvalidTableConfigException("Creating HLC realtime table is 
not allowed for Table: " + tableNameWithType);
+    }
+  }
+
   private void ensureRealtimeClusterIsSetUp(TableConfig config, String 
realtimeTableName,
       IndexingConfig indexingConfig) {
     StreamConfig streamConfig = new 
StreamConfig(indexingConfig.getStreamConfigs());
@@ -1346,12 +1358,13 @@ public class PinotHelixResourceManager {
       throws IOException {
 
     if (tableType == TableType.REALTIME) {
+      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+      verifyIndexingConfig(tableNameWithType, indexingConfig);
       // Update replica group partition assignment to the property store if 
applicable
       if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) {
         updateReplicaGroupPartitionAssignment(tableConfig);
       }
       ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, 
tableNameWithType, tableConfig.toZNRecord());
-      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
       ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, 
indexingConfig);
     } else if (tableType == TableType.OFFLINE) {
       // Update replica group partition assignment to the property store if 
applicable
@@ -1408,6 +1421,8 @@ public class PinotHelixResourceManager {
     setExistingTableConfig(tableConfig, tableNameWithType, type);
 
     if (type == TableType.REALTIME) {
+      // Check if HLC table is allowed
+      verifyIndexingConfig(tableNameWithType, newConfigs);
       ensureRealtimeClusterIsSetUp(tableConfig, tableName, newConfigs);
     }
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 14b0c2c..2f145a8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -21,15 +21,24 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Function;
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import org.apache.avro.reflect.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableCustomConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TenantConfig;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -70,6 +79,13 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
   }
 
   @Override
+  public void startController() {
+    ControllerConf controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.setHLCTablesAllowed(false);
+    startController(controllerConfig);
+  }
+
+  @Override
   protected boolean useLlc() {
     return true;
   }
@@ -142,5 +158,29 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
       }
     }, 600_000L, "Failed to generate inverted index");
   }
+
+  @Test
+  public void testAddHLCTableShouldFail() {
+    TableConfig tableConfig = new TableConfig();
+    IndexingConfig indexingConfig = new IndexingConfig();
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("stream.kafka.consumer.type", "HIGHLEVEL");
+    indexingConfig.setStreamConfigs(streamConfigs);
+    tableConfig.setIndexingConfig(indexingConfig);
+    tableConfig.setTableName("testTable");
+    tableConfig.setTableType(CommonConstants.Helix.TableType.REALTIME);
+    SegmentsValidationAndRetentionConfig validationAndRetentionConfig = new 
SegmentsValidationAndRetentionConfig();
+    tableConfig.setValidationConfig(validationAndRetentionConfig);
+    TenantConfig tenantConfig = new TenantConfig();
+    tableConfig.setTenantConfig(tenantConfig);
+    TableCustomConfig tableCustomConfig = new TableCustomConfig();
+    tableConfig.setCustomConfig(tableCustomConfig);
+    try {
+      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), 
tableConfig.toJsonConfigString());
+      Assert.fail();
+    } catch (IOException e) {
+      // Expected
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to