This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch helix_debug in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5b9cfaff5aabe4195220191902b5f4cfb5d58e87 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Tue Jul 9 22:32:04 2019 -0700 Improve controller tests --- .../broker/broker/helix/ClusterChangeMediator.java | 12 +- .../apache/pinot/controller/ControllerConf.java | 22 +-- .../apache/pinot/controller/ControllerStarter.java | 15 +- .../api/resources/PinotControllerHealthCheck.java | 4 +- .../resources/PinotTableConfigRestletResource.java | 2 + .../api/resources/PinotTableRestletResource.java | 10 +- .../helix/core/PinotHelixResourceManager.java | 33 ++-- .../helix/core/SegmentDeletionManager.java | 17 +- .../helix/core/util/HelixSetupUtils.java | 180 ++++++++------------- .../controller/validation/StorageQuotaChecker.java | 12 +- .../helix/ControllerPeriodicTaskStarterTest.java | 23 +-- .../pinot/controller/helix/ControllerTest.java | 47 ++---- .../controller/helix/PinotControllerModeTest.java | 10 +- .../tests/BaseClusterIntegrationTest.java | 3 +- .../tests/OfflineClusterIntegrationTest.java | 13 +- 15 files changed, 145 insertions(+), 258 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java index 64b2a1e..72be04d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java @@ -62,7 +62,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan private final Thread _clusterChangeHandlingThread; - private volatile boolean _stopped = false; + private boolean _stopped = false; public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap, BrokerMetrics brokerMetrics) { @@ -145,7 +145,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan /** * Starts the cluster change mediator. */ - public void start() { + public synchronized void start() { LOGGER.info("Starting the cluster change handling thread"); _clusterChangeHandlingThread.start(); } @@ -153,7 +153,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan /** * Stops the cluster change mediator. */ - public void stop() { + public synchronized void stop() { LOGGER.info("Stopping the cluster change handling thread"); _stopped = true; synchronized (_lastChangeTimeMap) { @@ -197,7 +197,11 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan * * @param changeType Type of the change */ - private void enqueueChange(ChangeType changeType) { + private synchronized void enqueueChange(ChangeType changeType) { + // Do not enqueue changes if already stopped + if (_stopped) { + return; + } if (_clusterChangeHandlingThread.isAlive()) { LOGGER.info("Enqueue {} change", changeType); synchronized (_lastChangeTimeMap) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 697fcf3..414d3bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -21,7 +21,6 @@ package org.apache.pinot.controller; import java.io.File; import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; @@ -59,9 +58,7 @@ public class ControllerConf extends PropertiesConfiguration { private static final String CONTROLLER_MODE = "controller.mode"; public enum ControllerMode { - DUAL, - PINOT_ONLY, - HELIX_ONLY + DUAL, PINOT_ONLY, HELIX_ONLY } public static class ControllerPeriodicTasksConf { @@ -175,22 +172,17 @@ public class ControllerConf extends PropertiesConfiguration { * Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists. */ public static URI getUriFromPath(String path) { - try { - URI uri = new URI(path); - if (uri.getScheme() != null) { - return uri; - } else { - return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME, path, null); - } - } catch (URISyntaxException e) { - LOGGER.error("Could not construct uri from path {}", path); - throw new RuntimeException(e); + URI uri = URI.create(path); + if (uri.getScheme() == null) { + uri = URI.create(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME + ":" + path); } + return uri; } public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) { try { - return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8"))); + return getUriFromPath( + StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8"))); } catch (UnsupportedEncodingException e) { LOGGER .error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index c313616..ac7dbff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.yammer.metrics.core.MetricsRegistry; @@ -236,12 +237,10 @@ public class ControllerStarter { } private void setUpPinotController() { - // Note: Right now we don't allow pinot-only mode to be used in production yet. - // Now we only have this mode used in tests. - // TODO: Remove this logic once all the helix separation PRs are committed. - if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) { - throw new RuntimeException("Pinot only controller currently isn't supported in production yet."); - } + // Note: Right now we don't allow Pinot-only controller as ControllerLeadershipManager is setup in Helix controller + // and Pinot controller relies on it + // TODO: Remove ControllerLeadershipManager + Preconditions.checkState(_controllerLeadershipManager != null); // Set up Pinot cluster in Helix HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode); @@ -506,10 +505,6 @@ public class ControllerStarter { } } - public boolean isPinotOnlyModeSupported() { - return false; - } - public MetricsRegistry getMetricsRegistry() { return _metricsRegistry; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java index e6c5182..7370085 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java @@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf; @Api(tags = Constants.HEALTH_TAG) -@Path("/pinot-controller/admin") +@Path("/") public class PinotControllerHealthCheck { @Inject ControllerConf controllerConf; @GET - @Path("/") + @Path("pinot-controller/admin") @ApiOperation(value = "Check controller health") @ApiResponses(value = {@ApiResponse(code = 200, message = "Good")}) @Produces(MediaType.TEXT_PLAIN) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java index 01d5fdf..5717170 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java @@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource { .type(MediaType.TEXT_PLAIN_TYPE).build(); } + // TODO: Fix the bug - when schema is not configured, after deserialization, CombinedConfig will have a non-null + // schema with null schema name if (config.getSchema() != null) { _resourceManager.addOrUpdateSchema(config.getSchema()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index daf6f88..fe3491a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; -import org.apache.pinot.core.realtime.stream.StreamConfig; import org.apache.pinot.core.util.ReplicationUtils; import org.slf4j.LoggerFactory; @@ -252,14 +251,16 @@ public class PinotTableRestletResource { @ApiOperation(value = "Deletes a table", notes = "Deletes a table") public SuccessResponse deleteTable( @ApiParam(value = "Name of the table to delete", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) { + @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) { List<String> tablesDeleted = new LinkedList<>(); try { - if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) { + if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) + && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) { _pinotHelixResourceManager.deleteOfflineTable(tableName); tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName)); } - if (tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) { + if ((tableTypeStr == null || tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name())) + && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) { _pinotHelixResourceManager.deleteRealtimeTable(tableName); tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName)); } @@ -373,7 +374,6 @@ public class PinotTableRestletResource { throw new PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e); } - if (verifyReplication) { int requestReplication; try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 5eeeddc..3c0ca0b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -160,10 +160,10 @@ public class PinotHelixResourceManager { public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), - CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" - + controllerConf.getControllerPort(), controllerConf.getDataDir(), - controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(), - controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed()); + CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_" + controllerConf + .getControllerPort(), controllerConf.getDataDir(), controllerConf.getExternalViewOnlineToOfflineTimeout(), + controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), + controllerConf.getHLCTablesAllowed()); } /** @@ -1087,9 +1087,6 @@ public class PinotHelixResourceManager { // lets add table configs ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); - _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType), - new ZNRecord(tableNameWithType), AccessOption.PERSISTENT); - // Update replica group partition assignment to the property store if applicable updateReplicaGroupPartitionAssignment(tableConfig); break; @@ -1228,9 +1225,8 @@ public class PinotHelixResourceManager { servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag()); } int numReplicas = ReplicationUtils.getReplication(tableConfig); - ReplicaGroupPartitionAssignment partitionAssignment = - partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, - numReplicas, servers); + ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator + .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers); partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment); } } @@ -1272,7 +1268,8 @@ public class PinotHelixResourceManager { // Check if HLC table is allowed. StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs()); if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) { - throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType); + throw new InvalidTableConfigException( + "Creating HLC realtime table is not allowed for Table: " + tableNameWithType); } } @@ -1468,8 +1465,8 @@ public class PinotHelixResourceManager { LOGGER.info("Deleting table {}: Finish", offlineTableName); } - public void deleteRealtimeTable(String rawTableName) { - String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + public void deleteRealtimeTable(String tableName) { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); LOGGER.info("Deleting table {}: Start", realtimeTableName); // Remove the table from brokerResource @@ -2071,14 +2068,12 @@ public class PinotHelixResourceManager { : PinotResourceManagerResponse.failure("Timed out. External view not completely updated"); } - public boolean hasRealtimeTable(String tableName) { - String actualTableName = tableName + "_REALTIME"; - return getAllTables().contains(actualTableName); + public boolean hasOfflineTable(String tableName) { + return getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName)); } - public boolean hasOfflineTable(String tableName) { - String actualTableName = tableName + "_OFFLINE"; - return getAllTables().contains(actualTableName); + public boolean hasRealtimeTable(String tableName) { + return getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName)); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 46c84c0..6415b06 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -172,16 +172,17 @@ public class SegmentDeletionManager { } protected void removeSegmentFromStore(String tableNameWithType, String segmentId) { - final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + // Ignore HLC segments as they are not stored in Pinot FS + if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) { + return; + } if (_dataDir != null) { - URI fileToMoveURI; - PinotFS pinotFS; - URI dataDirURI = ControllerConf.getUriFromPath(_dataDir); - fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId); URI deletedSegmentDestURI = ControllerConf .constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName, segmentId); - pinotFS = PinotFSFactory.create(dataDirURI.getScheme()); + PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme()); try { if (pinotFS.exists(fileToMoveURI)) { @@ -197,9 +198,7 @@ public class SegmentDeletionManager { deletedSegmentDestURI.toString()); } } else { - if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) { - LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString()); - } + LOGGER.warn("Not found local segment file for segment {}", fileToMoveURI.toString()); } } catch (IOException e) { LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 82c4cab..74682c8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -20,16 +20,10 @@ package org.apache.pinot.controller.helix.core.util; import com.google.common.base.Preconditions; import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.ZNRecord; import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -37,16 +31,15 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.utils.CommonConstants; -import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; @@ -102,150 +95,103 @@ public class HelixSetupUtils { */ public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel, boolean enableBatchMessageMode) { - final HelixAdmin admin = new ZKHelixAdmin(zkPath); - Preconditions.checkState(admin.getClusters().contains(helixClusterName), - String.format("Helix cluster: %s hasn't been set up", helixClusterName)); - - // Add segment state model definition if needed - addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel); - - // Add broker resource if needed - createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); - - // Add lead controller resource if needed - createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); - - // Init property store if needed - initPropertyStoreIfNeeded(helixClusterName, zkPath); + HelixZkClient zkClient = null; + try { + zkClient = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkPath), + new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()).setConnectInitTimeout( + TimeUnit.SECONDS.toMillis(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC))); + zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); + HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient); + HelixDataAccessor helixDataAccessor = + new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient)); + + Preconditions.checkState(helixAdmin.getClusters().contains(helixClusterName), + String.format("Helix cluster: %s hasn't been set up", helixClusterName)); + + // Add segment state model definition if needed + addSegmentStateModelDefinitionIfNeeded(helixClusterName, helixAdmin, helixDataAccessor, isUpdateStateModel); + + // Add broker resource if needed + createBrokerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode); + + // Add lead controller resource if needed + createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } } - private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath, - boolean isUpdateStateModel) { - final String segmentStateModelName = + private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin, + HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) { + String segmentStateModelName = PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName); - if (stateModelDefinition == null) { - LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName, - PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString()); - admin.addStateModelDef(helixClusterName, segmentStateModelName, - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); - } else if (isUpdateStateModel) { - final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName); - List<String> states = curStateModelDef.getStatesPriorityList(); - if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) { - LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName); + StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName); + if (stateModelDefinition == null || isUpdateStateModel) { + if (stateModelDefinition == null) { + LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName); } else { - LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName); - StateModelDefinition newStateModelDef = - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition(); - ZkClient zkClient = new ZkClient(zkPath); - zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); - zkClient.setZkSerializer(new ZNRecordSerializer()); - HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient)); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef); - LOGGER.info("Completed updating state model {}", segmentStateModelName); - zkClient.close(); + LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName); } + helixDataAccessor + .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } } - private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin, + private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, boolean enableBatchMessageMode) { // Add broker resource online offline state model definition if needed - StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName, + StateModelDefinition brokerResourceStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL); if (brokerResourceStateModelDefinition == null) { LOGGER.info("Adding state model definition named : {} generated using : {}", PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()); - admin.addStateModelDef(helixClusterName, + helixAdmin.addStateModelDef(helixClusterName, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } // Create broker resource if needed. - IdealState brokerResourceIdealState = - admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); - if (brokerResourceIdealState == null) { + if (helixAdmin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) == null) { LOGGER.info("Adding empty ideal state for Broker!"); - HelixHelper - .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName, - admin); - IdealState idealState = PinotTableIdealStateBuilder - .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode); - admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState); + IdealState emptyIdealStateForBrokerResource = PinotTableIdealStateBuilder + .buildEmptyIdealStateForBrokerResource(helixAdmin, helixClusterName, enableBatchMessageMode); + helixAdmin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, + emptyIdealStateForBrokerResource); } } - private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin, + private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, boolean enableBatchMessageMode) { - StateModelDefinition masterSlaveStateModelDefinition = - admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name); - if (masterSlaveStateModelDefinition == null) { - LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name, - MasterSlaveSMD.class.toString()); - admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build()); - } - - IdealState leadControllerResourceIdealState = - admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - if (leadControllerResourceIdealState == null) { + if (helixAdmin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME) == null) { LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin); - // FULL-AUTO Master-Slave state model with CrushED reBalance strategy. - admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, - CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name, - IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName()); + // FULL-AUTO Master-Slave state model with CrushED rebalance strategy. + IdealState leadControllerResourceIdealState = new IdealState(LEAD_CONTROLLER_RESOURCE_NAME); + leadControllerResourceIdealState + .setNumPartitions(CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE); + leadControllerResourceIdealState.setStateModelDefRef(MasterSlaveSMD.name); + leadControllerResourceIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + leadControllerResourceIdealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + leadControllerResourceIdealState.setReplicas("0"); // Set instance group tag for lead controller resource. - IdealState leadControllerIdealState = - admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); - leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode); + leadControllerResourceIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); + leadControllerResourceIdealState.setBatchMessageMode(enableBatchMessageMode); // The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened. // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment. // This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently. // Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs. - leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS); - leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS); - leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE); - admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState); - + leadControllerResourceIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS); + leadControllerResourceIdealState.setRebalanceDelay(REBALANCE_DELAY_MS); + leadControllerResourceIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE); // Explicitly disable this resource when creating this new resource. // When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility. // In the next major release, we can enable this resource by default, so that all the controller logic can be separated. - admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false); + leadControllerResourceIdealState.enable(false); - LOGGER.info("Re-balance lead controller resource with replicas: {}", - CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); - // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions. - admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, - CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); - } - } - - private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) { - String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName); - ZkHelixPropertyStore<ZNRecord> propertyStore = - new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath); - if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) { - propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) { - propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT); + helixAdmin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerResourceIdealState); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java index db23301..efc1640 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java @@ -105,14 +105,16 @@ public class StorageQuotaChecker { if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getStorage())) { // no quota configuration...so ignore for backwards compatibility - LOGGER.warn("Quota configuration not set for table: {}", tableNameWithType); - return success("Quota configuration not set for table: " + tableNameWithType); + LOGGER.info("Storage quota is not configured for table: {}, skipping the check", tableNameWithType); + return success("Storage quota is not configured for table: " + tableNameWithType); } long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes(); - if (allowedStorageBytes < 0) { - LOGGER.warn("Storage quota is not configured for table: {}", tableNameWithType); - return success("Storage quota is not configured for table: " + tableNameWithType); + if (allowedStorageBytes <= 0) { + LOGGER.warn("Invalid storage quota: {} for table: {}, skipping the check", quotaConfig.getStorage(), + tableNameWithType); + return success( + String.format("Invalid storage quota: %s for table: %s", quotaConfig.getStorage(), tableNameWithType)); } _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_QUOTA, allowedStorageBytes); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java index 8f72a59..59d5003 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java @@ -30,7 +30,6 @@ import org.testng.annotations.Test; public class ControllerPeriodicTaskStarterTest extends ControllerTest { - private MockControllerStarter _mockControllerStarter; @BeforeClass public void setup() { @@ -51,27 +50,11 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest { } @Override - protected void startControllerStarter(ControllerConf config) { - _mockControllerStarter = new MockControllerStarter(config); - _mockControllerStarter.start(); - _helixResourceManager = _mockControllerStarter.getHelixResourceManager(); - _helixManager = _mockControllerStarter.getHelixControllerManager(); + protected ControllerStarter getControllerStarter(ControllerConf config) { + return new MockControllerStarter(config); } - @Override - protected void stopControllerStarter() { - Assert.assertNotNull(_mockControllerStarter); - - _mockControllerStarter.stop(); - _mockControllerStarter = null; - } - - @Override - protected ControllerStarter getControllerStarter() { - return _mockControllerStarter; - } - - private class MockControllerStarter extends TestOnlyControllerStarter { + private class MockControllerStarter extends ControllerStarter { private static final int NUM_PERIODIC_TASKS = 7; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 9893377..9eadb70 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix; +import com.google.common.base.Preconditions; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -38,7 +39,6 @@ import org.apache.commons.io.IOUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.data.DimensionFieldSpec; import org.apache.pinot.common.data.FieldSpec; @@ -66,7 +66,6 @@ public abstract class ControllerTest { protected ControllerRequestURLBuilder _controllerRequestURLBuilder; protected String _controllerDataDir; - protected ZkClient _zkClient; protected ControllerStarter _controllerStarter; protected PinotHelixResourceManager _helixResourceManager; protected HelixManager _helixManager; @@ -95,58 +94,33 @@ public abstract class ControllerTest { } } - public static ControllerConf getDefaultControllerConfiguration() { + public ControllerConf getDefaultControllerConfiguration() { ControllerConf config = new ControllerConf(); config.setControllerHost(LOCAL_HOST); config.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT)); config.setDataDir(DEFAULT_DATA_DIR); config.setZkStr(ZkStarter.DEFAULT_ZK_STR); + config.setHelixClusterName(getHelixClusterName()); return config; } - public class TestOnlyControllerStarter extends ControllerStarter { - - TestOnlyControllerStarter(ControllerConf conf) { - super(conf); - } - - @Override - public boolean isPinotOnlyModeSupported() { - return true; - } - } - protected void startController() { startController(getDefaultControllerConfiguration()); } protected void startController(ControllerConf config) { - startController(config, true); - } - - protected void startController(ControllerConf config, boolean deleteCluster) { - Assert.assertNotNull(config); - Assert.assertNull(_controllerStarter); + Preconditions.checkState(_controllerStarter == null); _controllerPort = Integer.valueOf(config.getControllerPort()); _controllerBaseApiUrl = "http://localhost:" + _controllerPort; _controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl); _controllerDataDir = config.getDataDir(); - String helixClusterName = getHelixClusterName(); - config.setHelixClusterName(helixClusterName); - - String zkStr = config.getZkStr(); - _zkClient = new ZkClient(zkStr); - if (_zkClient.exists("/" + helixClusterName) && deleteCluster) { - _zkClient.deleteRecursive("/" + helixClusterName); - } - startControllerStarter(config); // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode. - switch (getControllerStarter().getControllerMode()) { + switch (_controllerStarter.getControllerMode()) { case DUAL: case PINOT_ONLY: _helixAdmin = _helixResourceManager.getHelixAdmin(); @@ -160,16 +134,19 @@ public abstract class ControllerTest { } protected void startControllerStarter(ControllerConf config) { - _controllerStarter = new TestOnlyControllerStarter(config); + _controllerStarter = getControllerStarter(config); _controllerStarter.start(); _helixResourceManager = _controllerStarter.getHelixResourceManager(); _helixManager = _controllerStarter.getHelixControllerManager(); } + protected ControllerStarter getControllerStarter(ControllerConf config) { + return new ControllerStarter(config); + } + protected void stopController() { stopControllerStarter(); FileUtils.deleteQuietly(new File(_controllerDataDir)); - _zkClient.close(); } protected void stopControllerStarter() { @@ -179,10 +156,6 @@ public abstract class ControllerTest { _controllerStarter = null; } - protected ControllerStarter getControllerStarter() { - return _controllerStarter; - } - protected Schema createDummySchema(String tableName) { Schema schema = new Schema(); schema.setSchemaName(tableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java index d91e612..bcd9bf8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java @@ -78,12 +78,11 @@ public class PinotControllerModeTest extends ControllerTest { // Starting a second dual-mode controller. Helix cluster has already been set up. ControllerConf controllerConfig = getDefaultControllerConfiguration(); - controllerConfig.setHelixClusterName(getHelixClusterName()); controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL); controllerConfig.setControllerPort( Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++)); - ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig); + ControllerStarter secondDualModeController = getControllerStarter(controllerConfig); secondDualModeController.start(); TestUtils .waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(), @@ -113,7 +112,6 @@ public class PinotControllerModeTest extends ControllerTest { // Starting a helix controller. ControllerConf config2 = getDefaultControllerConfiguration(); - config2.setHelixClusterName(getHelixClusterName()); config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY); config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); ControllerStarter helixControllerStarter = new ControllerStarter(config2); @@ -128,11 +126,10 @@ public class PinotControllerModeTest extends ControllerTest { // Starting a pinot only controller. ControllerConf config3 = getDefaultControllerConfiguration(); - config3.setHelixClusterName(getHelixClusterName()); config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3); + ControllerStarter firstPinotOnlyController = getControllerStarter(config3); firstPinotOnlyController.start(); PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager = firstPinotOnlyController.getHelixResourceManager(); @@ -143,11 +140,10 @@ public class PinotControllerModeTest extends ControllerTest { // Start a second Pinot only controller. ControllerConf config4 = getDefaultControllerConfiguration(); - config4.setHelixClusterName(getHelixClusterName()); config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4); + ControllerStarter secondControllerStarter = getControllerStarter(config4); secondControllerStarter.start(); // Two controller instances assigned to cluster. TestUtils diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 914adda..1a3b7e6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()); protected final File _avroDir = new File(_tempDir, "avroDir"); - protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir"); protected final File _segmentDir = new File(_tempDir, "segmentDir"); protected final File _tarDir = new File(_tempDir, "tarDir"); protected List<KafkaServerStartable> _kafkaStarters; @@ -184,7 +183,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } @Nullable - protected String getServerTenant() { + protected String getServerTenant() { return TagNameUtils.DEFAULT_TENANT_NAME; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 9ae4eef..bd8ab1a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -19,8 +19,8 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; @@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; -import org.apache.pinot.core.plan.SelectionPlanNode; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet pqlQuery = "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch; JsonNode response2 = postQuery(pqlQuery); - pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch; + pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch; JsonNode response3 = postQuery(pqlQuery); - pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch; + pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch + + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch; JsonNode response4 = postQuery(pqlQuery); - pqlQuery = "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch; + pqlQuery = + "SELECT count(*) FROM mytable WHERE DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch; JsonNode response5 = postQuery(pqlQuery); double val1 = response1.get("aggregationResults").get(0).get("value").asDouble(); @@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet double val2 = response2.get("aggregationResults").get(0).get("value").asDouble(); Assert.assertEquals(val1, val2); } - } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
