This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch fix_service_status in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 27d168d85a47c7093cd60347bb688edbba544511 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Tue Apr 2 21:54:41 2019 -0700 In ServiceStatus, replace Apache math Pair with inner StatusDescriptionPair class Also fix the integration test to correctly include resources in ServiceStatus --- .../apache/pinot/common/utils/ServiceStatus.java | 64 +++++++------ .../tests/OfflineClusterIntegrationTest.java | 103 +++++++++------------ 2 files changed, 83 insertions(+), 84 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java index 3fc8d01..5beea88 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.math3.util.Pair; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -49,12 +48,11 @@ public class ServiceStatus { STARTING, GOOD, BAD } - public static String STATUS_DESCRIPTION_NONE = "None"; - public static String STATUS_DESCRIPTION_INIT = "Init"; + public static final String STATUS_DESCRIPTION_NONE = "None"; + public static final String STATUS_DESCRIPTION_INIT = "Init"; + public static final String STATUS_DESCRIPTION_NO_HELIX_STATE = "Helix state does not exist"; - private static String _noHelixState = "Helix state does not exist"; - - private static int MAX_RESOURCE_NAMES_TO_LOG = 5; + private static final int MAX_RESOURCE_NAMES_TO_LOG = 5; /** * Callback that returns the status of the service. @@ -173,7 +171,8 @@ public class ServiceStatus { } } _numTotalResourcesToMonitor = _resourcesToMonitor.size(); - _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100)); + _minResourcesStartCount = + (int) Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100)); LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor, getResourceListAsString(), _instanceName); @@ -189,7 +188,8 @@ public class ServiceStatus { _resourcesToMonitor = new HashSet<>(resourcesToMonitor); _numTotalResourcesToMonitor = _resourcesToMonitor.size(); - _minResourcesStartCount = (int)Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor/100)); + _minResourcesStartCount = + (int) Math.round(Math.ceil(minResourcesStartPercent * _numTotalResourcesToMonitor / 100)); LOGGER.info("Monitoring {} resources: {} for start up of instance {}", _numTotalResourcesToMonitor, getResourceListAsString(), _instanceName); } @@ -238,19 +238,18 @@ public class ServiceStatus { _resourceIterator = _resourcesToMonitor.iterator(); } resourceName = _resourceIterator.next(); - Pair<Status, String> statusPair = evaluateResourceStatus(resourceName); + StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resourceName); - Status status = statusPair.getFirst(); - if (status == Status.GOOD) { + if (statusDescriptionPair._status == Status.GOOD) { // Resource is done starting up, remove it from the set _resourceIterator.remove(); } else { _statusDescription = String .format("%s, waitingFor=%s, resource=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d,", - statusPair.getSecond(), getMatchName(), resourceName, _resourcesToMonitor.size(), + statusDescriptionPair._description, getMatchName(), resourceName, _resourcesToMonitor.size(), _numTotalResourcesToMonitor, _minResourcesStartCount); - return status; + return statusDescriptionPair._status; } } @@ -275,35 +274,36 @@ public class ServiceStatus { _resourceIterator = _resourcesToMonitor.iterator(); while (_resourceIterator.hasNext()) { String resource = _resourceIterator.next(); - Pair<Status, String> statusPair = evaluateResourceStatus(resource); - if (statusPair.getFirst() == Status.GOOD) { + StatusDescriptionPair statusDescriptionPair = evaluateResourceStatus(resource); + if (statusDescriptionPair._status == Status.GOOD) { _resourceIterator.remove(); } else { - LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusPair.getSecond()); + LOGGER.info("Resource: {}, StatusDescription: {}", resource, statusDescriptionPair._description); if (--logCount <= 0) { break; } } } - _statusDescription = String.format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d," - + " resourceList=%s", getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor, - _minResourcesStartCount, getResourceListAsString()); + _statusDescription = String + .format("waitingFor=%s, numResourcesLeft=%d, numTotalResources=%d, minStartCount=%d," + " resourceList=%s", + getMatchName(), _resourcesToMonitor.size(), _numTotalResourcesToMonitor, _minResourcesStartCount, + getResourceListAsString()); LOGGER.info("Instance {} returning GOOD because {}", _instanceName, _statusDescription); } return Status.GOOD; } - private Pair<Status, String> evaluateResourceStatus(String resourceName) { + private StatusDescriptionPair evaluateResourceStatus(String resourceName) { IdealState idealState = getResourceIdealState(resourceName); // If the resource has been removed or disabled, ignore it if (idealState == null || !idealState.isEnabled()) { - return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE); + return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE); } T helixState = getState(resourceName); if (helixState == null) { - return new Pair(Status.STARTING, _noHelixState); + return new StatusDescriptionPair(Status.STARTING, STATUS_DESCRIPTION_NO_HELIX_STATE); } // Check that all partitions that are supposed to be in any state other than OFFLINE have the same status in the @@ -325,13 +325,13 @@ public class ServiceStatus { if ("ERROR".equals(currentStateStatus)) { LOGGER.error(String.format("Resource: %s, partition: %s is in ERROR state", resourceName, partitionName)); } else { - String description = String.format("partition=%s, expected=%s, found=%s", partitionName, - idealStateStatus, currentStateStatus); - return new Pair(Status.STARTING, description); + String description = String + .format("partition=%s, expected=%s, found=%s", partitionName, idealStateStatus, currentStateStatus); + return new StatusDescriptionPair(Status.STARTING, description); } } } - return new Pair(Status.GOOD, STATUS_DESCRIPTION_NONE); + return new StatusDescriptionPair(Status.GOOD, STATUS_DESCRIPTION_NONE); } private String getResourceListAsString() { @@ -358,6 +358,7 @@ public class ServiceStatus { */ public static class IdealStateAndCurrentStateMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<CurrentState> { private static final String MATCH_NAME = "CurrentStateMatch"; + public IdealStateAndCurrentStateMatchServiceStatusCallback(HelixManager helixManager, String clusterName, String instanceName, double minResourcesStartPercent) { super(helixManager, clusterName, instanceName, minResourcesStartPercent); @@ -394,6 +395,7 @@ public class ServiceStatus { */ public static class IdealStateAndExternalViewMatchServiceStatusCallback extends IdealStateMatchServiceStatusCallback<ExternalView> { private static final String MATCH_NAME = "ExternalViewMatch"; + public IdealStateAndExternalViewMatchServiceStatusCallback(HelixManager helixManager, String clusterName, String instanceName, double minResourcesStartPercent) { super(helixManager, clusterName, instanceName, minResourcesStartPercent); @@ -428,4 +430,14 @@ public class ServiceStatus { return MATCH_NAME; } } + + private static class StatusDescriptionPair { + Status _status; + String _description; + + StatusDescriptionPair(Status status, String description) { + _status = status; + _description = description; + } + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index ad9884d..14b694f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; @@ -32,7 +31,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.CommonConstants; @@ -61,7 +59,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; - private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier"); + private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier"); private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'"; // For default columns test @@ -95,23 +93,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet startBrokers(getNumBrokers()); startServers(getNumServers()); - // Set up service status callbacks - List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName); - for (String instance : instances) { - if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) { - _serviceStatusCallbacks.add( - new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance, - Collections.singletonList(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE), 100.0)); - } - if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) { - _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList - .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName, - instance, 100.0), - new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, - instance, 100.0)))); - } - } - // Unpack the Avro files List<File> avroFiles = unpackAvroData(_tempDir); @@ -140,6 +121,21 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Upload all segments uploadSegments(_tarDir); + // Set up service status callbacks + // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the + // resources to monitor + List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName); + for (String instance : instances) { + if (instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) || instance + .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) { + _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList + .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName, + instance, 100.0), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, + instance, 100.0)))); + } + } + // Wait for all documents loaded waitForAllDocsLoaded(600_000L); } @@ -184,17 +180,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null); - TestUtils.waitForCondition(new Function<Void, Boolean>() { - @Override - public Boolean apply(@Nullable Void aVoid) { - try { - JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return queryResponse.get("numEntriesScannedInFilter").asLong() == 0L; - } catch (Exception e) { - throw new RuntimeException(e); - } + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); + // Total docs should not change during reload + assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs); + return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0L; + } catch (Exception e) { + throw new RuntimeException(e); } }, 600_000L, "Failed to generate inverted index"); } @@ -208,23 +201,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Update table config and trigger reload updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null, - UPDATED_BLOOM_FLITER_COLUMNS, getTaskConfig()); + UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig()); updateTableConfiguration(); sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null); - TestUtils.waitForCondition(new Function<Void, Boolean>() { - @Override - public Boolean apply(@Nullable Void aVoid) { - try { - JsonNode queryResponse = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return queryResponse.get("numSegmentsProcessed").asLong() == 0L; - } catch (Exception e) { - throw new RuntimeException(e); - } + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResponse1 = postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY); + // Total docs should not change during reload + assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs); + return queryResponse1.get("numSegmentsProcessed").asLong() == 0L; + } catch (Exception e) { + throw new RuntimeException(e); } }, 600_000L, "Failed to generate inverted index"); } @@ -285,22 +275,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet errorMessage = "Failed to remove default columns"; } - TestUtils.waitForCondition(new Function<Void, Boolean>() { - @Override - public Boolean apply(@Nullable Void aVoid) { - try { - JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - long count = queryResponse.get("aggregationResults").get(0).get("value").asLong(); - if (withExtraColumns) { - return count == numTotalDocs; - } else { - return count == 0; - } - } catch (Exception e) { - throw new RuntimeException(e); + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY); + // Total docs should not change during reload + assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); + long count = queryResponse.get("aggregationResults").get(0).get("value").asLong(); + if (withExtraColumns) { + return count == numTotalDocs; + } else { + return count == 0; } + } catch (Exception e) { + throw new RuntimeException(e); } }, 600_000L, errorMessage); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
