This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a5b3483a83 [Flaky-test] Fix
LLCRealtimeClusterIntegrationTest.testReset (#11806)
a5b3483a83 is described below
commit a5b3483a838839e606b251f2bb6e0d0b22f75a93
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Oct 15 22:14:34 2023 -0700
[Flaky-test] Fix LLCRealtimeClusterIntegrationTest.testReset (#11806)
---
.../tests/BaseClusterIntegrationTestSet.java | 63 ++++++++++++----------
.../tests/LLCRealtimeClusterIntegrationTest.java | 5 +-
.../tests/OfflineClusterIntegrationTest.java | 5 +-
3 files changed, 40 insertions(+), 33 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 1d2a75215f..9ce9c69959 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -23,14 +23,16 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Message;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.exception.QueryException;
@@ -605,40 +607,47 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
}, 60_000L, errorMessage);
}
- public void testReset(TableType tableType)
- throws Exception {
+ public void testReset(TableType tableType) {
String rawTableName = getTableName();
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
- // reset the table.
- resetTable(rawTableName, tableType, null);
-
- // wait for all live messages clear the queue.
- List<String> instances =
_helixResourceManager.getServerInstancesForTable(rawTableName, tableType);
- PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+ // Reset the table.
+ // NOTE: Reset table might fail if there are pending messages, so we need
to retry until it succeeds.
TestUtils.waitForCondition(aVoid -> {
- int liveMessageCount = 0;
- for (String instanceName : instances) {
- List<Message> messages =
_helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true);
- liveMessageCount += messages.size();
+ try {
+ resetTable(rawTableName, tableType, null);
+ return true;
+ } catch (IOException e) {
+ assertTrue(e.toString().contains("pending message"), "Got unexpected
exception: " + e);
+ return false;
}
- return liveMessageCount == 0;
- }, 30_000L, "Failed to wait for all segment reset messages clear helix
state transition!");
-
- // Check that all segment states come back to ONLINE.
+ }, 30_000L, "Failed to reset table: " + tableNameWithType);
+
+ // Wait for all the reset messages being processed.
+ IdealState idealState =
_helixResourceManager.getTableIdealState(tableNameWithType);
+ assertNotNull(idealState, "Failed to find ideal state for table: " +
tableNameWithType);
+ Set<String> instances = new HashSet<>();
+ for (Map<String, String> instanceStateMap :
idealState.getRecord().getMapFields().values()) {
+ instances.addAll(instanceStateMap.keySet());
+ }
+ PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
TestUtils.waitForCondition(aVoid -> {
- // check external view and wait for everything to come back online
- ExternalView externalView =
_helixAdmin.getResourceExternalView(getHelixClusterName(),
- TableNameBuilder.forType(tableType).tableNameWithType(rawTableName));
- for (Map<String, String> externalViewStateMap :
externalView.getRecord().getMapFields().values()) {
- for (String state : externalViewStateMap.values()) {
- if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
- &&
!CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
- return false;
- }
+ for (String instanceName : instances) {
+ if
(!_helixDataAccessor.getChildNames(keyBuilder.messages(instanceName)).isEmpty())
{
+ return false;
}
}
return true;
- }, 30_000L, "Failed to wait for all segments come back online");
+ }, 30_000L, "Failed to process all the reset messages");
+
+ // Wait for external view converging with ideal state.
+ TestUtils.waitForCondition(aVoid -> {
+ IdealState is =
_helixResourceManager.getTableIdealState(tableNameWithType);
+ assertNotNull(is, "Failed to find ideal state for table: " +
tableNameWithType);
+ ExternalView ev =
_helixResourceManager.getTableExternalView(tableNameWithType);
+ assertNotNull(ev, "Failed to find external view for table: " +
tableNameWithType);
+ return
ev.getRecord().getMapFields().equals(is.getRecord().getMapFields());
+ }, 30_000L, "Failed to match the ideal state");
}
public String reloadTableAndValidateResponse(String tableName, TableType
tableType, boolean forceDownload)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 65d2adb9b3..51b19993f1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -313,9 +313,8 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
}
@Test
- public void testReset()
- throws Exception {
- super.testReset(TableType.REALTIME);
+ public void testReset() {
+ testReset(TableType.REALTIME);
}
@Test
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index a5057d44ef..b129783170 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2972,9 +2972,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
@Test
- public void testReset()
- throws Exception {
- super.testReset(TableType.OFFLINE);
+ public void testReset() {
+ testReset(TableType.OFFLINE);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]