Copilot commented on code in PR #17400:
URL: https://github.com/apache/pinot/pull/17400#discussion_r2634090047


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -722,79 +722,150 @@ public boolean isReloadJobCompleted(String reloadJobId)
     return jobStatus.get("totalSegmentCount").asInt() == 
jobStatus.get("successCount").asInt();
   }
 
-  /**
-   * TODO: Support removing new added columns for MutableSegment and remove 
the new added columns before running the
-   *       next test. Use this to replace {@link 
OfflineClusterIntegrationTest#testDefaultColumns(boolean)}.
-   */
+  /// TODO: Unify this and 
[OfflineClusterIntegrationTest#testDefaultColumns(boolean)]
   public void testReload(boolean includeOfflineTable)
       throws Exception {
+    testReload(includeOfflineTable, false);
+    testReload(includeOfflineTable, true);
+  }
+
+  private void testReload(boolean includeOfflineTable, boolean forceDownload)
+      throws Exception {
     String rawTableName = getTableName();
-    Schema schema = getSchema(getTableName());
+    Schema oldSchema = getSchema(rawTableName);
 
     String selectStarQuery = "SELECT * FROM " + rawTableName;
     JsonNode queryResponse = postQuery(selectStarQuery);
-    
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
 schema.size());
+    
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
 oldSchema.size());
     long numTotalDocs = queryResponse.get("totalDocs").asLong();
 
-    addNewSchemaFields(schema);
-    String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+    Schema newSchema = createSchema();

Review Comment:
   Creating a new schema via `createSchema()` and then modifying it with 
`addNewSchemaFields()` differs from the original approach that modified the 
existing schema directly. Verify that `createSchema()` produces a schema 
identical to what `getSchema(rawTableName)` returns initially, otherwise the 
test behavior may have changed unintentionally.
   ```suggestion
       Schema newSchema = getSchema(rawTableName);
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -722,79 +722,150 @@ public boolean isReloadJobCompleted(String reloadJobId)
     return jobStatus.get("totalSegmentCount").asInt() == 
jobStatus.get("successCount").asInt();
   }
 
-  /**
-   * TODO: Support removing new added columns for MutableSegment and remove 
the new added columns before running the
-   *       next test. Use this to replace {@link 
OfflineClusterIntegrationTest#testDefaultColumns(boolean)}.
-   */
+  /// TODO: Unify this and 
[OfflineClusterIntegrationTest#testDefaultColumns(boolean)]
   public void testReload(boolean includeOfflineTable)
       throws Exception {
+    testReload(includeOfflineTable, false);
+    testReload(includeOfflineTable, true);
+  }
+
+  private void testReload(boolean includeOfflineTable, boolean forceDownload)
+      throws Exception {
     String rawTableName = getTableName();
-    Schema schema = getSchema(getTableName());
+    Schema oldSchema = getSchema(rawTableName);
 
     String selectStarQuery = "SELECT * FROM " + rawTableName;
     JsonNode queryResponse = postQuery(selectStarQuery);
-    
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
 schema.size());
+    
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
 oldSchema.size());
     long numTotalDocs = queryResponse.get("totalDocs").asLong();
 
-    addNewSchemaFields(schema);
-    String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+    Schema newSchema = createSchema();
+    addNewSchemaFields(newSchema);
+
+    // Without reload, select star should be able to include new added columns 
after schema change is propagated to both
+    // broker and server
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode testQueryResponse = postQuery(selectStarQuery);
+        // If schema change is propagated to broker before server, server 
might not be able to find the column for a
+        // short period of time
+        if (!testQueryResponse.get("exceptions").isEmpty()) {
+          return false;
+        }
+        return 
testQueryResponse.get("resultTable").get("dataSchema").get("columnNames").size()
 == newSchema.size();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to generate default virtual columns without reload");
+
+    // Test reload needed, and trigger reload
     String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
-    String reloadJob;
-    // Tests that reload is needed on the table from controller api 
segments/{tableNameWithType}/needReload
+    testTableNeedReload(realtimeTableName, true);
+    String realtimeReloadJobId = reloadTableAndValidateResponse(rawTableName, 
TableType.REALTIME, forceDownload);
+    String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+    String offlineReloadJobId;
     if (includeOfflineTable) {
       testTableNeedReload(offlineTableName, true);
-      // Reload the table
-      reloadJob = reloadTableAndValidateResponse(rawTableName, 
TableType.OFFLINE, false);
+      offlineReloadJobId = reloadTableAndValidateResponse(rawTableName, 
TableType.OFFLINE, forceDownload);
+    } else {
+      offlineReloadJobId = null;
+    }
+
+    // Wait for reload job to finish
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode testQueryResponse = postQuery(selectStarQuery);
+        // Should not throw exception during reload
+        assertEquals(testQueryResponse.get("exceptions").size(), 0,
+            String.format("Found exceptions when testing reload for query: %s 
and response: %s", selectStarQuery,
+                testQueryResponse));
+        // Total docs should not change during reload
+        assertEquals(testQueryResponse.get("totalDocs").asLong(), numTotalDocs,
+            String.format("Total docs changed after reload, query: %s and 
response: %s", selectStarQuery,
+                testQueryResponse));
+        
assertEquals(testQueryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
+            newSchema.size());
+        return isReloadJobCompleted(realtimeReloadJobId) && 
(offlineReloadJobId == null || isReloadJobCompleted(
+            offlineReloadJobId));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 600_000L, "Failed to finish reload job");
+
+    // Test reload not needed after previous reload completed
+    testTableNeedReload(realtimeTableName, false);
+    if (includeOfflineTable) {
+      testTableNeedReload(offlineTableName, false);
     }
-    testTableNeedReload(realtimeTableName, true);
-    reloadJob = reloadTableAndValidateResponse(rawTableName, 
TableType.REALTIME, false);
 
-    // Wait for all segments to finish reloading, and test filter on all newly 
added columns
-    // NOTE: Use count query to prevent schema inconsistency error
-    String testQuery = "SELECT COUNT(*) FROM " + rawTableName
+    // Test filter on all newly added columns
+    String filterQuery = "SELECT COUNT(*) FROM " + rawTableName
         + " WHERE NewIntSVDimension < 0 AND NewLongSVDimension < 0 AND 
NewFloatSVDimension < 0 AND "
         + "NewDoubleSVDimension < 0 AND NewStringSVDimension = 'null' AND 
NewIntMVDimension < 0 AND "
         + "NewLongMVDimension < 0 AND NewFloatMVDimension < 0 AND 
NewDoubleMVDimension < 0 AND "
         + "NewStringMVDimension = 'null' AND NewIntMetric = 0 AND 
NewLongMetric = 0 AND NewFloatMetric = 0 "
         + "AND NewDoubleMetric = 0 AND NewBytesMetric = ''";
-    long countStarResult = getCountStarResult();
-    String finalReloadJob = reloadJob;
+    queryResponse = postQuery(filterQuery);
+    assertTrue(queryResponse.get("exceptions").isEmpty());
+    assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+    assertEquals(queryResponse.get("numDocsScanned").asLong(), numTotalDocs);
+    
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asLong(),
 numTotalDocs);
+
+    // Remove the extra columns

Review Comment:
   The method `forceUpdateSchema` is called here but its behavior and 
difference from `updateSchema` (used on line 889) is unclear from context. 
Consider whether this should also be `updateSchema` for consistency, or 
document why force update is required in this specific case.
   ```suggestion
       // Remove the extra columns. This rollback drops columns added earlier 
in the test, which is not a
       // backward-compatible schema change, so we use forceUpdateSchema 
instead of updateSchema.
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java:
##########
@@ -815,7 +886,7 @@ private void addNewSchemaFields(Schema schema)
     schema.addField(constructNewMetric(FieldSpec.DataType.DOUBLE));
     schema.addField(constructNewMetric(FieldSpec.DataType.BYTES));
     // Upload the schema with extra columns
-    addSchema(schema);
+    updateSchema(schema);

Review Comment:
   The method call was changed from `addSchema(schema)` to 
`updateSchema(schema)`. This change appears unrelated to the PR's stated 
purpose of removing the force download check. If `updateSchema` is the correct 
method to use when adding new fields to an existing schema, this is fine, but 
if these are functionally different operations, this could introduce unexpected 
behavior. Verify that `updateSchema` is the appropriate method for this use 
case where new fields are being added to an existing schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to