This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cc3eb2e9c2 Refactor TableRebalanceIntegrationTest /
HybridClusterIntegrationTest to avoid duplication in test runs (#15929)
cc3eb2e9c2 is described below
commit cc3eb2e9c296761aa3db7e7e6a9fa7d66bd213bc
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 28 16:40:49 2025 +0100
Refactor TableRebalanceIntegrationTest / HybridClusterIntegrationTest to
avoid duplication in test runs (#15929)
---
.../tests/BaseHybridClusterIntegrationTest.java | 171 +++++++++++++++++++++
.../tests/HybridClusterIntegrationTest.java | 148 +-----------------
.../tests/TableRebalanceIntegrationTest.java | 2 +-
3 files changed, 173 insertions(+), 148 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseHybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseHybridClusterIntegrationTest.java
new file mode 100644
index 0000000000..107e8a9edd
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseHybridClusterIntegrationTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * Hybrid cluster integration test that uploads 8 months of data as offline
and 6 months of data as realtime (with a
+ * two month overlap).
+ */
+public class BaseHybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet {
+ private static final String TENANT_NAME = "TestTenant";
+ private static final int NUM_OFFLINE_SEGMENTS = 8;
+ private static final int NUM_REALTIME_SEGMENTS = 6;
+ protected static final int NUM_SERVERS_OFFLINE = 1;
+ protected static final int NUM_SERVERS_REALTIME = 1;
+ protected static final int NUM_SERVERS = NUM_SERVERS_OFFLINE +
NUM_SERVERS_REALTIME;
+
+ @Override
+ protected String getBrokerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected String getServerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+ }
+
+ protected void overrideBrokerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_INSTANCE_TAGS,
+ TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
false);
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start Zk, Kafka and Pinot
+ startHybridCluster();
+
+ List<File> avroFiles = getAllAvroFiles();
+ List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles,
NUM_OFFLINE_SEGMENTS);
+ List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles,
NUM_REALTIME_SEGMENTS);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+ addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
+ _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(realtimeAvroFiles);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ protected void startHybridCluster()
+ throws Exception {
+ startZk();
+ startController();
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ // Set max segment preprocess parallelism to 10
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
Integer.toString(10));
+ // Set max segment startree preprocess parallelism to 6
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
Integer.toString(6));
+ // Set max segment download parallelism to 12 to test that all segments
can be processed
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
Integer.toString(12));
+ startBroker();
+ startServers(NUM_SERVERS);
+ startKafka();
+
+ // Create tenants
+ createServerTenant(TENANT_NAME, NUM_SERVERS_OFFLINE, NUM_SERVERS_REALTIME);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ // Try deleting the tables and check that they have no routing table
+ String tableName = getTableName();
+ dropOfflineTable(tableName);
+ dropRealtimeTable(tableName);
+
+ // Routing should be removed after deleting all tables
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ getDebugInfo("debug/routingTable/" + tableName);
+ return false;
+ } catch (Exception e) {
+ // only return true if 404 not found error is thrown.
+ return e.getMessage().contains("Got error status code: 404");
+ }
+ }, 60_000L, "Routing table is not empty after dropping all tables");
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ cleanupHybridCluster();
+ }
+
+ /**
+ * Can be overridden to preserve segments.
+ *
+ * @throws Exception
+ */
+ protected void cleanupHybridCluster()
+ throws Exception {
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 845d04970e..e72a054ec5 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -19,30 +19,18 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
-import java.io.File;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.common.utils.URIUtils;
-import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -50,104 +38,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;
-/**
- * Hybrid cluster integration test that uploads 8 months of data as offline
and 6 months of data as realtime (with a
- * two month overlap).
- */
-public class HybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet {
- private static final String TENANT_NAME = "TestTenant";
- private static final int NUM_OFFLINE_SEGMENTS = 8;
- private static final int NUM_REALTIME_SEGMENTS = 6;
- protected static final int NUM_SERVERS_OFFLINE = 1;
- protected static final int NUM_SERVERS_REALTIME = 1;
- protected static final int NUM_SERVERS = NUM_SERVERS_OFFLINE +
NUM_SERVERS_REALTIME;
-
- @Override
- protected String getBrokerTenant() {
- return TENANT_NAME;
- }
-
- @Override
- protected String getServerTenant() {
- return TENANT_NAME;
- }
-
- @Override
- protected void overrideControllerConf(Map<String, Object> properties) {
- properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
- }
-
- protected void overrideBrokerConf(PinotConfiguration configuration) {
-
configuration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_INSTANCE_TAGS,
- TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
- }
-
- @Override
- protected void overrideServerConf(PinotConfiguration configuration) {
-
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
false);
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start Zk, Kafka and Pinot
- startHybridCluster();
-
- List<File> avroFiles = getAllAvroFiles();
- List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles,
NUM_OFFLINE_SEGMENTS);
- List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles,
NUM_REALTIME_SEGMENTS);
-
- // Create and upload the schema and table config
- Schema schema = createSchema();
- addSchema(schema);
- TableConfig offlineTableConfig = createOfflineTableConfig();
- addTableConfig(offlineTableConfig);
- addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
-
- // Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
- _tarDir);
- uploadSegments(getTableName(), _tarDir);
-
- // Push data into Kafka
- pushAvroIntoKafka(realtimeAvroFiles);
-
- // Set up the H2 connection
- setUpH2Connection(avroFiles);
-
- // Initialize the query generator
- setUpQueryGenerator(avroFiles);
-
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
- }
-
- protected void startHybridCluster()
- throws Exception {
- startZk();
- startController();
- HelixConfigScope scope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
- .build();
- // Set max segment preprocess parallelism to 10
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
Integer.toString(10));
- // Set max segment startree preprocess parallelism to 6
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
Integer.toString(6));
- // Set max segment download parallelism to 12 to test that all segments
can be processed
- _helixManager.getConfigAccessor()
- .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
Integer.toString(12));
- startBroker();
- startServers(NUM_SERVERS);
- startKafka();
-
- // Create tenants
- createServerTenant(TENANT_NAME, NUM_SERVERS_OFFLINE, NUM_SERVERS_REALTIME);
- }
-
+public class HybridClusterIntegrationTest extends
BaseHybridClusterIntegrationTest {
@Test
public void testUpdateBrokerResource()
throws Exception {
@@ -417,41 +308,4 @@ public class HybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
super.testVirtualColumnQueries();
}
-
- @AfterClass
- public void tearDown()
- throws Exception {
- // Try deleting the tables and check that they have no routing table
- String tableName = getTableName();
- dropOfflineTable(tableName);
- dropRealtimeTable(tableName);
-
- // Routing should be removed after deleting all tables
- TestUtils.waitForCondition(aVoid -> {
- try {
- getDebugInfo("debug/routingTable/" + tableName);
- return false;
- } catch (Exception e) {
- // only return true if 404 not found error is thrown.
- return e.getMessage().contains("Got error status code: 404");
- }
- }, 60_000L, "Routing table is not empty after dropping all tables");
-
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
- cleanupHybridCluster();
- }
-
- /**
- * Can be overridden to preserve segments.
- *
- * @throws Exception
- */
- protected void cleanupHybridCluster()
- throws Exception {
- FileUtils.deleteDirectory(_tempDir);
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 424c0f24d8..9c778908de 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -64,7 +64,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-public class TableRebalanceIntegrationTest extends
HybridClusterIntegrationTest {
+public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationTest {
private static String getQueryString(RebalanceConfig rebalanceConfig) {
return "dryRun=" + rebalanceConfig.isDryRun() + "&preChecks=" +
rebalanceConfig.isPreChecks()
+ "&reassignInstances=" + rebalanceConfig.isReassignInstances()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]