Copilot commented on code in PR #17400:
URL: https://github.com/apache/pinot/pull/17400#discussion_r2634090047
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -722,79 +722,150 @@ public boolean isReloadJobCompleted(String reloadJobId)
return jobStatus.get("totalSegmentCount").asInt() ==
jobStatus.get("successCount").asInt();
}
- /**
- * TODO: Support removing new added columns for MutableSegment and remove
the new added columns before running the
- * next test. Use this to replace {@link
OfflineClusterIntegrationTest#testDefaultColumns(boolean)}.
- */
+ /// TODO: Unify this and
[OfflineClusterIntegrationTest#testDefaultColumns(boolean)]
public void testReload(boolean includeOfflineTable)
throws Exception {
+ testReload(includeOfflineTable, false);
+ testReload(includeOfflineTable, true);
+ }
+
+ private void testReload(boolean includeOfflineTable, boolean forceDownload)
+ throws Exception {
String rawTableName = getTableName();
- Schema schema = getSchema(getTableName());
+ Schema oldSchema = getSchema(rawTableName);
String selectStarQuery = "SELECT * FROM " + rawTableName;
JsonNode queryResponse = postQuery(selectStarQuery);
-
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
schema.size());
+
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
oldSchema.size());
long numTotalDocs = queryResponse.get("totalDocs").asLong();
- addNewSchemaFields(schema);
- String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ Schema newSchema = createSchema();
Review Comment:
Creating a new schema via `createSchema()` and then modifying it with
`addNewSchemaFields()` differs from the original approach that modified the
existing schema directly. Verify that `createSchema()` produces a schema
identical to what `getSchema(rawTableName)` returns initially, otherwise the
test behavior may have changed unintentionally.
```suggestion
Schema newSchema = getSchema(rawTableName);
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -722,79 +722,150 @@ public boolean isReloadJobCompleted(String reloadJobId)
return jobStatus.get("totalSegmentCount").asInt() ==
jobStatus.get("successCount").asInt();
}
- /**
- * TODO: Support removing new added columns for MutableSegment and remove
the new added columns before running the
- * next test. Use this to replace {@link
OfflineClusterIntegrationTest#testDefaultColumns(boolean)}.
- */
+ /// TODO: Unify this and
[OfflineClusterIntegrationTest#testDefaultColumns(boolean)]
public void testReload(boolean includeOfflineTable)
throws Exception {
+ testReload(includeOfflineTable, false);
+ testReload(includeOfflineTable, true);
+ }
+
+ private void testReload(boolean includeOfflineTable, boolean forceDownload)
+ throws Exception {
String rawTableName = getTableName();
- Schema schema = getSchema(getTableName());
+ Schema oldSchema = getSchema(rawTableName);
String selectStarQuery = "SELECT * FROM " + rawTableName;
JsonNode queryResponse = postQuery(selectStarQuery);
-
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
schema.size());
+
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
oldSchema.size());
long numTotalDocs = queryResponse.get("totalDocs").asLong();
- addNewSchemaFields(schema);
- String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ Schema newSchema = createSchema();
+ addNewSchemaFields(newSchema);
+
+ // Without reload, select star should be able to include new added columns
after schema change is propagated to both
+ // broker and server
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode testQueryResponse = postQuery(selectStarQuery);
+ // If schema change is propagated to broker before server, server
might not be able to find the column for a
+ // short period of time
+ if (!testQueryResponse.get("exceptions").isEmpty()) {
+ return false;
+ }
+ return
testQueryResponse.get("resultTable").get("dataSchema").get("columnNames").size()
== newSchema.size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to generate default virtual columns without reload");
+
+ // Test reload needed, and trigger reload
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
- String reloadJob;
- // Tests that reload is needed on the table from controller api
segments/{tableNameWithType}/needReload
+ testTableNeedReload(realtimeTableName, true);
+ String realtimeReloadJobId = reloadTableAndValidateResponse(rawTableName,
TableType.REALTIME, forceDownload);
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+ String offlineReloadJobId;
if (includeOfflineTable) {
testTableNeedReload(offlineTableName, true);
- // Reload the table
- reloadJob = reloadTableAndValidateResponse(rawTableName,
TableType.OFFLINE, false);
+ offlineReloadJobId = reloadTableAndValidateResponse(rawTableName,
TableType.OFFLINE, forceDownload);
+ } else {
+ offlineReloadJobId = null;
+ }
+
+ // Wait for reload job to finish
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode testQueryResponse = postQuery(selectStarQuery);
+ // Should not throw exception during reload
+ assertEquals(testQueryResponse.get("exceptions").size(), 0,
+ String.format("Found exceptions when testing reload for query: %s
and response: %s", selectStarQuery,
+ testQueryResponse));
+ // Total docs should not change during reload
+ assertEquals(testQueryResponse.get("totalDocs").asLong(), numTotalDocs,
+ String.format("Total docs changed after reload, query: %s and
response: %s", selectStarQuery,
+ testQueryResponse));
+
assertEquals(testQueryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
+ newSchema.size());
+ return isReloadJobCompleted(realtimeReloadJobId) &&
(offlineReloadJobId == null || isReloadJobCompleted(
+ offlineReloadJobId));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to finish reload job");
+
+ // Test reload not needed after previous reload completed
+ testTableNeedReload(realtimeTableName, false);
+ if (includeOfflineTable) {
+ testTableNeedReload(offlineTableName, false);
}
- testTableNeedReload(realtimeTableName, true);
- reloadJob = reloadTableAndValidateResponse(rawTableName,
TableType.REALTIME, false);
- // Wait for all segments to finish reloading, and test filter on all newly
added columns
- // NOTE: Use count query to prevent schema inconsistency error
- String testQuery = "SELECT COUNT(*) FROM " + rawTableName
+ // Test filter on all newly added columns
+ String filterQuery = "SELECT COUNT(*) FROM " + rawTableName
+ " WHERE NewIntSVDimension < 0 AND NewLongSVDimension < 0 AND
NewFloatSVDimension < 0 AND "
+ "NewDoubleSVDimension < 0 AND NewStringSVDimension = 'null' AND
NewIntMVDimension < 0 AND "
+ "NewLongMVDimension < 0 AND NewFloatMVDimension < 0 AND
NewDoubleMVDimension < 0 AND "
+ "NewStringMVDimension = 'null' AND NewIntMetric = 0 AND
NewLongMetric = 0 AND NewFloatMetric = 0 "
+ "AND NewDoubleMetric = 0 AND NewBytesMetric = ''";
- long countStarResult = getCountStarResult();
- String finalReloadJob = reloadJob;
+ queryResponse = postQuery(filterQuery);
+ assertTrue(queryResponse.get("exceptions").isEmpty());
+ assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+ assertEquals(queryResponse.get("numDocsScanned").asLong(), numTotalDocs);
+
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(),
numTotalDocs);
+
+ // Remove the extra columns
Review Comment:
The method `forceUpdateSchema` is called here but its behavior and
difference from `updateSchema` (used on line 889) is unclear from context.
Consider whether this should also be `updateSchema` for consistency, or
document why force update is required in this specific case.
```suggestion
// Remove the extra columns. This rollback drops columns added earlier
in the test, which is not a
// backward-compatible schema change, so we use forceUpdateSchema
instead of updateSchema.
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -815,7 +886,7 @@ private void addNewSchemaFields(Schema schema)
schema.addField(constructNewMetric(FieldSpec.DataType.DOUBLE));
schema.addField(constructNewMetric(FieldSpec.DataType.BYTES));
// Upload the schema with extra columns
- addSchema(schema);
+ updateSchema(schema);
Review Comment:
The method call was changed from `addSchema(schema)` to
`updateSchema(schema)`. This change appears unrelated to the PR's stated
purpose of removing the force download check. If `updateSchema` is the correct
method to use when adding new fields to an existing schema, this is fine, but
if these are functionally different operations, this could introduce unexpected
behavior. Verify that `updateSchema` is the appropriate method for this use
case where new fields are being added to an existing schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]