This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 626c43a Misc fix for controller tests (#4431)
626c43a is described below
commit 626c43a37483af8b32e485ad241962935bd9232f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jul 12 20:16:53 2019 -0700
Misc fix for controller tests (#4431)
---
.../api/resources/PinotControllerHealthCheck.java | 4 +--
.../resources/PinotTableConfigRestletResource.java | 2 ++
.../api/resources/PinotTableRestletResource.java | 10 +++----
.../helix/core/PinotHelixResourceManager.java | 34 +++++++++-------------
.../helix/core/SegmentDeletionManager.java | 10 ++++---
.../controller/validation/StorageQuotaChecker.java | 32 ++++++++++----------
.../tests/BaseClusterIntegrationTest.java | 1 -
.../tests/OfflineClusterIntegrationTest.java | 13 +++++----
8 files changed, 52 insertions(+), 54 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
index e6c5182..7370085 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerHealthCheck.java
@@ -32,14 +32,14 @@ import org.apache.pinot.controller.ControllerConf;
@Api(tags = Constants.HEALTH_TAG)
-@Path("/pinot-controller/admin")
+@Path("/")
public class PinotControllerHealthCheck {
@Inject
ControllerConf controllerConf;
@GET
- @Path("/")
+ @Path("pinot-controller/admin")
@ApiOperation(value = "Check controller health")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Good")})
@Produces(MediaType.TEXT_PLAIN)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 01d5fdf..5717170 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -110,6 +110,8 @@ public class PinotTableConfigRestletResource {
.type(MediaType.TEXT_PLAIN_TYPE).build();
}
+ // TODO: Fix the bug - when schema is not configured, after
deserialization, CombinedConfig will have a non-null
+ // schema with null schema name
if (config.getSchema() != null) {
_resourceManager.addOrUpdateSchema(config.getSchema());
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index daf6f88..fe3491a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,7 +62,6 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.util.ReplicationUtils;
import org.slf4j.LoggerFactory;
@@ -252,14 +251,16 @@ public class PinotTableRestletResource {
@ApiOperation(value = "Deletes a table", notes = "Deletes a table")
public SuccessResponse deleteTable(
@ApiParam(value = "Name of the table to delete", required = true)
@PathParam("tableName") String tableName,
- @ApiParam(value = "realtime|offline", required = false)
@QueryParam("type") String tableTypeStr) {
+ @ApiParam(value = "realtime|offline") @QueryParam("type") String
tableTypeStr) {
List<String> tablesDeleted = new LinkedList<>();
try {
- if (tableTypeStr == null ||
tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name())) {
+ if ((tableTypeStr == null ||
tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.OFFLINE.name()))
+ && !TableNameBuilder.REALTIME.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteOfflineTable(tableName);
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- if (tableTypeStr == null ||
tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
{
+ if ((tableTypeStr == null ||
tableTypeStr.equalsIgnoreCase(CommonConstants.Helix.TableType.REALTIME.name()))
+ && !TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableName)) {
_pinotHelixResourceManager.deleteRealtimeTable(tableName);
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
@@ -373,7 +374,6 @@ public class PinotTableRestletResource {
throw new
PinotHelixResourceManager.InvalidTableConfigException(errorMsg, e);
}
-
if (verifyReplication) {
int requestReplication;
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5eeeddc..0cf2f50 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -117,7 +117,6 @@ import org.slf4j.LoggerFactory;
public class PinotHelixResourceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotHelixResourceManager.class);
- private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS =
120_000L; // 2 minutes
private static final long DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS
= 500L;
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
public static final String APPEND = "APPEND";
@@ -160,10 +159,10 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE +
controllerConf.getControllerHost() + "_"
- + controllerConf.getControllerPort(), controllerConf.getDataDir(),
- controllerConf.getExternalViewOnlineToOfflineTimeout(),
controllerConf.tenantIsolationEnabled(),
- controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed());
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE +
controllerConf.getControllerHost() + "_" + controllerConf
+ .getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(),
+ controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
+ controllerConf.getHLCTablesAllowed());
}
/**
@@ -1087,9 +1086,6 @@ public class PinotHelixResourceManager {
// lets add table configs
ZKMetadataProvider.setOfflineTableConfig(_propertyStore,
tableNameWithType, tableConfig.toZNRecord());
-
_propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
- new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
-
// Update replica group partition assignment to the property store if
applicable
updateReplicaGroupPartitionAssignment(tableConfig);
break;
@@ -1228,9 +1224,8 @@ public class PinotHelixResourceManager {
servers =
getInstancesWithTag(realtimeTagConfig.getConsumingServerTag());
}
int numReplicas = ReplicationUtils.getReplication(tableConfig);
- ReplicaGroupPartitionAssignment partitionAssignment =
-
partitionAssignmentGenerator.buildReplicaGroupPartitionAssignment(tableNameWithType,
tableConfig,
- numReplicas, servers);
+ ReplicaGroupPartitionAssignment partitionAssignment =
partitionAssignmentGenerator
+ .buildReplicaGroupPartitionAssignment(tableNameWithType,
tableConfig, numReplicas, servers);
partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment);
}
}
@@ -1272,7 +1267,8 @@ public class PinotHelixResourceManager {
// Check if HLC table is allowed.
StreamConfig streamConfig = new
StreamConfig(indexingConfig.getStreamConfigs());
if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
- throw new InvalidTableConfigException("Creating HLC realtime table is
not allowed for Table: " + tableNameWithType);
+ throw new InvalidTableConfigException(
+ "Creating HLC realtime table is not allowed for Table: " +
tableNameWithType);
}
}
@@ -1468,8 +1464,8 @@ public class PinotHelixResourceManager {
LOGGER.info("Deleting table {}: Finish", offlineTableName);
}
- public void deleteRealtimeTable(String rawTableName) {
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ public void deleteRealtimeTable(String tableName) {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
LOGGER.info("Deleting table {}: Start", realtimeTableName);
// Remove the table from brokerResource
@@ -2071,14 +2067,12 @@ public class PinotHelixResourceManager {
: PinotResourceManagerResponse.failure("Timed out. External view not
completely updated");
}
- public boolean hasRealtimeTable(String tableName) {
- String actualTableName = tableName + "_REALTIME";
- return getAllTables().contains(actualTableName);
+ public boolean hasOfflineTable(String tableName) {
+ return
getAllResources().contains(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
- public boolean hasOfflineTable(String tableName) {
- String actualTableName = tableName + "_OFFLINE";
- return getAllTables().contains(actualTableName);
+ public boolean hasRealtimeTable(String tableName) {
+ return
getAllResources().contains(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index b2ab8b3..53bcb61 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -170,8 +170,12 @@ public class SegmentDeletionManager {
}
protected void removeSegmentFromStore(String tableNameWithType, String
segmentId) {
- final String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ // Ignore HLC segments as they are not stored in Pinot FS
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
+ return;
+ }
if (_dataDir != null) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS,
rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
@@ -190,9 +194,7 @@ public class SegmentDeletionManager {
deletedSegmentDestURI.toString());
}
} else {
- if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
- LOGGER.warn("Not found local segment file for segment {}" +
fileToMoveURI.toString());
- }
+ LOGGER.warn("Failed to find local segment file for segment {}",
fileToMoveURI.toString());
}
} catch (IOException e) {
LOGGER.warn("Could not move segment {} from {} to {}", segmentId,
fileToMoveURI.toString(),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..e895242 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -21,8 +21,6 @@ package org.apache.pinot.controller.validation;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -83,15 +81,12 @@ public class StorageQuotaChecker {
* @param segmentFile untarred segment. This should not be null.
* segmentFile must exist on disk and must be a directory
* @param segmentName name of the segment being added
- * @param timeoutMsec timeout in milliseconds for reading table sizes from
server
+ * @param timeoutMs timeout in milliseconds for reading table sizes from
server
*
*/
- public QuotaCheckerResponse isSegmentStorageWithinQuota(@Nonnull File
segmentFile, @Nonnull String segmentName,
- @Nonnegative int timeoutMsec)
+ public QuotaCheckerResponse isSegmentStorageWithinQuota(File segmentFile,
String segmentName, int timeoutMs)
throws InvalidConfigException {
- Preconditions.checkNotNull(segmentFile);
- Preconditions.checkNotNull(segmentName);
- Preconditions.checkArgument(timeoutMsec > 0, "Timeout value must be > 0,
input: %s", timeoutMsec);
+ Preconditions.checkArgument(timeoutMs > 0, "Timeout value must be > 0,
input: %s", timeoutMs);
Preconditions.checkArgument(segmentFile.exists(), "Segment file: %s does
not exist", segmentFile);
Preconditions.checkArgument(segmentFile.isDirectory(), "Segment file: %s
is not a directory", segmentFile);
@@ -105,23 +100,28 @@ public class StorageQuotaChecker {
if (quotaConfig == null ||
Strings.isNullOrEmpty(quotaConfig.getStorage())) {
// no quota configuration...so ignore for backwards compatibility
- LOGGER.warn("Quota configuration not set for table: {}",
tableNameWithType);
- return success("Quota configuration not set for table: " +
tableNameWithType);
+ String message =
+ String.format("Storage quota is not configured for table: %s,
skipping the check", tableNameWithType);
+ LOGGER.info(message);
+ return success(message);
}
long allowedStorageBytes = numReplicas * quotaConfig.storageSizeBytes();
- if (allowedStorageBytes < 0) {
- LOGGER.warn("Storage quota is not configured for table: {}",
tableNameWithType);
- return success("Storage quota is not configured for table: " +
tableNameWithType);
+ if (allowedStorageBytes <= 0) {
+ String message = String
+ .format("Invalid storage quota: %s for table: %s, skipping the
check", quotaConfig.getStorage(),
+ tableNameWithType);
+ LOGGER.warn(message);
+ return success(message);
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_QUOTA, allowedStorageBytes);
long incomingSegmentSizeBytes = FileUtils.sizeOfDirectory(segmentFile);
// read table size
- TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize = null;
+ TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize;
try {
- tableSubtypeSize =
_tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMsec);
+ tableSubtypeSize =
_tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMs);
} catch (InvalidConfigException e) {
LOGGER.error("Failed to get table size for table {}", tableNameWithType,
e);
throw e;
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
tableNameWithType, tableSubtypeSize.estimatedSizeInBytes,
tableSubtypeSize.reportedSizeInBytes);
// Only emit the real percentage of storage quota usage by lead
controller, otherwise emit 0L.
- if (isLeader() && allowedStorageBytes != 0L) {
+ if (isLeader()) {
long existingStorageQuotaUtilization =
tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
existingStorageQuotaUtilization);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 2eb17f5..67fabc6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,7 +71,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(),
getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
- protected final File _preprocessingDir = new File(_tempDir,
"preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<KafkaServerStartable> _kafkaStarters;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 71598b9..75358b1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,8 +19,8 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -611,13 +610,16 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
pqlQuery = "SELECT count(*) FROM mytable WHERE
timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch;
JsonNode response2 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " +
secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch
+ + " OR timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " +
secondsSinceEpoch;
JsonNode response3 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " +
secondsSinceEpoch;
+ pqlQuery = "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " +
daysSinceEpoch
+ + " AND timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " +
secondsSinceEpoch;
JsonNode response4 = postQuery(pqlQuery);
- pqlQuery = "SELECT count(*) FROM mytable WHERE
DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
+ pqlQuery =
+ "SELECT count(*) FROM mytable WHERE
DIV(timeConvert(DaysSinceEpoch,'DAYS','SECONDS'),1) = " + secondsSinceEpoch;
JsonNode response5 = postQuery(pqlQuery);
double val1 =
response1.get("aggregationResults").get(0).get("value").asDouble();
@@ -653,7 +655,6 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
double val2 =
response2.get("aggregationResults").get(0).get("value").asDouble();
Assert.assertEquals(val1, val2);
}
-
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]