This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8c8c0ae Add config to disable HLC realtime segment completion (#4235)
8c8c0ae is described below
commit 8c8c0ae999ab7277b90e003af9ab1a034f64025e
Author: Jialiang Li <[email protected]>
AuthorDate: Wed May 29 09:49:52 2019 -0700
Add config to disable HLC realtime segment completion (#4235)
* Add config to disable HLC realtime segment completion
---
.../apache/pinot/controller/ControllerConf.java | 15 ++++++--
.../apache/pinot/controller/ControllerStarter.java | 16 ++++++---
.../helix/core/PinotHelixResourceManager.java | 25 +++++++++++---
.../tests/LLCRealtimeClusterIntegrationTest.java | 40 ++++++++++++++++++++++
4 files changed, 85 insertions(+), 11 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 7fc638a..0b23fdb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -32,7 +32,6 @@ import
org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.filesystem.LocalPinotFS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,8 +135,11 @@ public class ControllerConf extends
PropertiesConfiguration {
private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
"controller.realtime.segment.metadata.commit.numLocks";
private static final String ENABLE_STORAGE_QUOTA_CHECK =
"controller.enable.storage.quota.check";
-
private static final String ENABLE_BATCH_MESSAGE_MODE =
"controller.enable.batch.message.mode";
+ // It is used to disable the HLC realtime segment completion and disallow
HLC table in the cluster. True by default.
+ // If it's set to false, existing HLC realtime tables will stop consumption,
and creation of new HLC tables will be disallowed.
+ // Please make sure there is no HLC table running in the cluster before
disallowing it.
+ private static final String ALLOW_HLC_TABLES = "controller.allow.hlc.tables";
// Defines the kind of storage and the underlying PinotFS implementation
private static final String PINOT_FS_FACTORY_CLASS_PREFIX =
"controller.storage.factory.class";
@@ -155,6 +157,7 @@ public class ControllerConf extends PropertiesConfiguration
{
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
+ private static final boolean DEFAULT_ALLOW_HLC_TABLES = true;
private static final String DEFAULT_CONTROLLER_MODE =
ControllerMode.DUAL.name();
private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL =
LocalPinotFS.class.getName();
@@ -637,4 +640,12 @@ public class ControllerConf extends
PropertiesConfiguration {
public ControllerMode getControllerMode() {
return ControllerMode.valueOf(getString(CONTROLLER_MODE,
DEFAULT_CONTROLLER_MODE).toUpperCase());
}
+
+ public boolean getHLCTablesAllowed() {
+ return getBoolean(ALLOW_HLC_TABLES, DEFAULT_ALLOW_HLC_TABLES);
+ }
+
+ public void setHLCTablesAllowed(boolean allowHLCTables) {
+ setProperty(ALLOW_HLC_TABLES, allowHLCTables);
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 67a306f..36f6d20 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -274,8 +274,14 @@ public class ControllerStarter {
new SegmentCompletionManager(helixParticipantManager,
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
_controllerLeadershipManager,
_config.getSegmentCommitTimeoutSeconds());
- _realtimeSegmentsManager = new
PinotRealtimeSegmentManager(_helixResourceManager,
_controllerLeadershipManager);
- _realtimeSegmentsManager.start(_controllerMetrics);
+ if (_config.getHLCTablesAllowed()) {
+ LOGGER.info("Realtime tables with High Level consumers will be
supported");
+ _realtimeSegmentsManager = new
PinotRealtimeSegmentManager(_helixResourceManager,
_controllerLeadershipManager);
+ _realtimeSegmentsManager.start(_controllerMetrics);
+ } else {
+ LOGGER.info("Realtime tables with High Level consumers will NOT be
supported");
+ _realtimeSegmentsManager = null;
+ }
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks =
setupControllerPeriodicTasks();
@@ -484,8 +490,10 @@ public class ControllerStarter {
LOGGER.info("Stopping Jersey admin API");
_adminApp.stop();
- LOGGER.info("Stopping realtime segment manager");
- _realtimeSegmentsManager.stop();
+ if (_realtimeSegmentsManager != null) {
+ LOGGER.info("Stopping realtime segment manager");
+ _realtimeSegmentsManager.stop();
+ }
LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ef883e9..aaa9ea8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -131,6 +131,7 @@ public class PinotHelixResourceManager {
private final long _externalViewOnlineToOfflineTimeoutMillis;
private final boolean _isSingleTenantCluster;
private final boolean _enableBatchMessageMode;
+ private final boolean _allowHLCTables;
private HelixManager _helixZkManager;
private HelixAdmin _helixAdmin;
@@ -145,7 +146,7 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String
helixClusterName,
@Nonnull String controllerInstanceId, String dataDir, long
externalViewOnlineToOfflineTimeoutMillis,
- boolean isSingleTenantCluster, boolean enableBatchMessageMode) {
+ boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean
allowHLCTables) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_instanceId = controllerInstanceId;
@@ -153,19 +154,20 @@ public class PinotHelixResourceManager {
_externalViewOnlineToOfflineTimeoutMillis =
externalViewOnlineToOfflineTimeoutMillis;
_isSingleTenantCluster = isSingleTenantCluster;
_enableBatchMessageMode = enableBatchMessageMode;
+ _allowHLCTables = allowHLCTables;
}
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String
helixClusterName,
@Nonnull String controllerInstanceId, @Nonnull String dataDir) {
this(zkURL, helixClusterName, controllerInstanceId, dataDir,
DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
- true);
+ true, true);
}
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getControllerHost() + "_" +
controllerConf.getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(),
controllerConf.tenantIsolationEnabled(),
- controllerConf.getEnableBatchMessageMode());
+ controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed());
}
/**
@@ -1098,6 +1100,9 @@ public class PinotHelixResourceManager {
updateReplicaGroupPartitionAssignment(tableConfig);
break;
case REALTIME:
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ verifyIndexingConfig(tableNameWithType, indexingConfig);
+
// Ensure that realtime table is not created if schema is not present
Schema schema =
ZKMetadataProvider.getSchema(_propertyStore,
TableNameBuilder.extractRawTableName(tableNameWithType));
@@ -1130,7 +1135,6 @@ public class PinotHelixResourceManager {
* We also need to support the case when a high-level consumer already
exists for a table and we are adding
* the low-level consumers.
*/
- IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType,
indexingConfig);
LOGGER.info("Successfully added or updated the table {} ",
tableNameWithType);
@@ -1270,6 +1274,14 @@ public class PinotHelixResourceManager {
_rebalanceSegmentStrategyFactory = rebalanceSegmentStrategyFactory;
}
+ private void verifyIndexingConfig(String tableNameWithType, IndexingConfig
indexingConfig) {
+ // Check if HLC table is allowed.
+ StreamConfig streamConfig = new
StreamConfig(indexingConfig.getStreamConfigs());
+ if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
+ throw new InvalidTableConfigException("Creating HLC realtime table is
not allowed for Table: " + tableNameWithType);
+ }
+ }
+
private void ensureRealtimeClusterIsSetUp(TableConfig config, String
realtimeTableName,
IndexingConfig indexingConfig) {
StreamConfig streamConfig = new
StreamConfig(indexingConfig.getStreamConfigs());
@@ -1346,12 +1358,13 @@ public class PinotHelixResourceManager {
throws IOException {
if (tableType == TableType.REALTIME) {
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ verifyIndexingConfig(tableNameWithType, indexingConfig);
// Update replica group partition assignment to the property store if
applicable
if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) {
updateReplicaGroupPartitionAssignment(tableConfig);
}
ZKMetadataProvider.setRealtimeTableConfig(_propertyStore,
tableNameWithType, tableConfig.toZNRecord());
- IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType,
indexingConfig);
} else if (tableType == TableType.OFFLINE) {
// Update replica group partition assignment to the property store if
applicable
@@ -1408,6 +1421,8 @@ public class PinotHelixResourceManager {
setExistingTableConfig(tableConfig, tableNameWithType, type);
if (type == TableType.REALTIME) {
+ // Check if HLC table is allowed
+ verifyIndexingConfig(tableNameWithType, newConfigs);
ensureRealtimeClusterIsSetUp(tableConfig, tableName, newConfigs);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 14b0c2c..2f145a8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -21,15 +21,24 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.avro.reflect.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableCustomConfig;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -70,6 +79,13 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
}
@Override
+ public void startController() {
+ ControllerConf controllerConfig = getDefaultControllerConfiguration();
+ controllerConfig.setHLCTablesAllowed(false);
+ startController(controllerConfig);
+ }
+
+ @Override
protected boolean useLlc() {
return true;
}
@@ -142,5 +158,29 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
}
}, 600_000L, "Failed to generate inverted index");
}
+
+ @Test
+ public void testAddHLCTableShouldFail() {
+ TableConfig tableConfig = new TableConfig();
+ IndexingConfig indexingConfig = new IndexingConfig();
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("stream.kafka.consumer.type", "HIGHLEVEL");
+ indexingConfig.setStreamConfigs(streamConfigs);
+ tableConfig.setIndexingConfig(indexingConfig);
+ tableConfig.setTableName("testTable");
+ tableConfig.setTableType(CommonConstants.Helix.TableType.REALTIME);
+ SegmentsValidationAndRetentionConfig validationAndRetentionConfig = new
SegmentsValidationAndRetentionConfig();
+ tableConfig.setValidationConfig(validationAndRetentionConfig);
+ TenantConfig tenantConfig = new TenantConfig();
+ tableConfig.setTenantConfig(tenantConfig);
+ TableCustomConfig tableCustomConfig = new TableCustomConfig();
+ tableConfig.setCustomConfig(tableCustomConfig);
+ try {
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
tableConfig.toJsonConfigString());
+ Assert.fail();
+ } catch (IOException e) {
+ // Expected
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]