This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 169aba9fc9 Fix schema name in table config during controller startup
(#11574)
169aba9fc9 is described below
commit 169aba9fc9dded326c1ed9cb31cd7ad6e1752089
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Oct 10 17:55:25 2023 -0700
Fix schema name in table config during controller startup (#11574)
* Adding a controller task to update schema name
* Misc fixes
* Fix linter
---------
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
---
.../pinot/common/metrics/ControllerGauge.java | 17 +-
.../pinot/controller/BaseControllerStarter.java | 117 +++++++++
.../cleanup/SchemaCleanupTaskStatelessTest.java | 285 +++++++++++++++++++++
3 files changed, 418 insertions(+), 1 deletion(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 64ced0203c..887b342110 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -132,7 +132,22 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
MAX_RECORDS_LAG("maxRecordsLag", false),
// Consumption availability lag in ms at a partition level
- MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false);
+ MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false),
+
+ // Number of table schema got misconfigured
+ MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true),
+
+ // Number of table without schema
+ TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true),
+
+ // Number of table schema got fixed
+ FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true),
+
+ // Number of tables that we want to fix but failed to copy schema from old
schema name to new schema name
+ FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true),
+
+ // Number of tables that we want to fix but failed to update table config
+ FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 240e8d662b..3b2565f020 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -35,8 +35,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -47,13 +49,16 @@ import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.function.FunctionRegistry;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -62,9 +67,11 @@ import
org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.common.utils.PinotAppConfigs;
+import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
@@ -105,6 +112,7 @@ import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -117,7 +125,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.apache.zookeeper.data.Stat;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -525,6 +535,10 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
throw new RuntimeException("Unable to start controller due to existing
HLC tables!");
}
+ // One time job to fix schema name in all tables
+ // This method can be removed after the next major release.
+ fixSchemaNameInTableConfig();
+
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new
File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
@@ -549,6 +563,109 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
}
+ /**
+ * This method is used to fix table/schema names.
+ * TODO: in the next release, maybe 2.0.0, we can remove this method.
Meanwhile we can delete the orphan schemas
+ * that has been existed longer than a certain time period.
+ *
+ */
+ @VisibleForTesting
+ public void fixSchemaNameInTableConfig() {
+ AtomicInteger misconfiguredTableCount = new AtomicInteger();
+ AtomicInteger tableWithoutSchemaCount = new AtomicInteger();
+ AtomicInteger fixedSchemaTableCount = new AtomicInteger();
+ AtomicInteger failedToCopySchemaCount = new AtomicInteger();
+ AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixResourceManager.getPropertyStore();
+
+ List<String> allTables = _helixResourceManager.getAllTables();
+ allTables.forEach(tableNameWithType -> {
+ String tableConfigPath =
ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
+ Stat tableConfigStat = new Stat();
+ ZNRecord tableConfigZNRecord = propertyStore.get(tableConfigPath,
tableConfigStat, AccessOption.PERSISTENT);
+ if (tableConfigZNRecord == null) {
+ // This might due to table deletion, just log it here.
+ LOGGER.warn("Failed to find table config for table: {}, the table
likely already got deleted",
+ tableNameWithType);
+ return;
+ }
+ TableConfig tableConfig;
+ try {
+ tableConfig = TableConfigUtils.fromZNRecord(tableConfigZNRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception constructing table config from ZNRecord
for table: {}", tableNameWithType, e);
+ return;
+ }
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String schemaPath =
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
+ boolean schemaExists = propertyStore.exists(schemaPath,
AccessOption.PERSISTENT);
+ String existSchemaName =
tableConfig.getValidationConfig().getSchemaName();
+ if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
+ // Although the table config is valid, we still need to ensure the
schema exists
+ if (!schemaExists) {
+ LOGGER.warn("Failed to find schema for table: {}",
tableNameWithType);
+ tableWithoutSchemaCount.getAndIncrement();
+ return;
+ }
+ // Table config is already in good status
+ return;
+ }
+ misconfiguredTableCount.getAndIncrement();
+ if (schemaExists) {
+ // If a schema named `rawTableName` already exists, then likely this
is a misconfiguration.
+ // Reset schema name in table config to null to let the table point to
the existing schema.
+ LOGGER.warn("Schema: {} already exists, fix the schema name in table
config from {} to null", rawTableName,
+ existSchemaName);
+ } else {
+ // Copy the schema current table referring to to `rawTableName` if it
does not exist
+ Schema schema = _helixResourceManager.getSchema(existSchemaName);
+ if (schema == null) {
+ LOGGER.warn("Failed to find schema: {} for table: {}",
existSchemaName, tableNameWithType);
+ tableWithoutSchemaCount.getAndIncrement();
+ return;
+ }
+ schema.setSchemaName(rawTableName);
+ if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema),
AccessOption.PERSISTENT)) {
+ LOGGER.info("Copied schema: {} to {}", existSchemaName,
rawTableName);
+ } else {
+ LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName,
rawTableName);
+ failedToCopySchemaCount.getAndIncrement();
+ return;
+ }
+ }
+ // Update table config to remove schema name
+ tableConfig.getValidationConfig().setSchemaName(null);
+ try {
+ tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception constructing ZNRecord from table config
for table: {}", tableNameWithType, e);
+ return;
+ }
+ if (propertyStore.set(tableConfigPath, tableConfigZNRecord,
tableConfigStat.getVersion(),
+ AccessOption.PERSISTENT)) {
+ LOGGER.info("Removed schema name from table config for table: {}",
tableNameWithType);
+ fixedSchemaTableCount.getAndIncrement();
+ } else {
+ LOGGER.warn("Failed to update table config for table: {}",
tableNameWithType);
+ failedToUpdateTableConfigCount.getAndIncrement();
+ }
+ });
+ LOGGER.info(
+ "Found {} tables misconfigured, {} tables without schema. Successfully
fixed schema for {} tables, failed to "
+ + "fix {} tables due to copy schema failure, failed to fix {}
tables due to update table config failure.",
+ misconfiguredTableCount.get(), tableWithoutSchemaCount.get(),
fixedSchemaTableCount.get(),
+ failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get());
+
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT,
+ misconfiguredTableCount.get());
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
tableWithoutSchemaCount.get());
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT,
fixedSchemaTableCount.get());
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT,
+ failedToCopySchemaCount.get());
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT,
+ failedToUpdateTableConfigCount.get());
+ }
+
private ServiceStatus.ServiceStatusCallback
generateServiceStatusCallback(HelixManager helixManager) {
return new ServiceStatus.ServiceStatusCallback() {
private volatile boolean _isStarted = false;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
new file mode 100644
index 0000000000..8d4e2077f0
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.MetricValueUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test can be deleted once {@link
BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0.
+ */
+@Test(groups = "stateless")
+public class SchemaCleanupTaskStatelessTest extends ControllerTest {
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ startZk();
+ startController();
+ startFakeBroker();
+ startFakeServer();
+ }
+
+ private void startFakeBroker()
+ throws Exception {
+ String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE +
NetUtils.getHostAddress() + "_"
+ + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
+
+ // Create server instance with the fake server state model
+ HelixManager brokerHelixManager =
+ HelixManagerFactory.getZKHelixManager(getHelixClusterName(),
brokerInstance, InstanceType.PARTICIPANT,
+ getZkUrl());
+ brokerHelixManager.connect();
+
+ // Add Helix tag to the server
+
brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+ }
+
+ private void startFakeServer()
+ throws Exception {
+ String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE +
NetUtils.getHostAddress() + "_"
+ + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+
+ // Create server instance with the fake server state model
+ HelixManager serverHelixManager = HelixManagerFactory
+ .getZKHelixManager(getHelixClusterName(), serverInstance,
InstanceType.PARTICIPANT, getZkUrl());
+ serverHelixManager.connect();
+
+ // Add Helix tag to the server
+
serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
serverInstance,
+
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+ }
+
+ @AfterClass
+ public void teardown() {
+ stopController();
+ stopZk();
+ }
+
+ @Test
+ public void testSchemaCleanupTask()
+ throws Exception {
+ PinotMetricUtils.cleanUp();
+ // 1. Add a schema
+ addSchema(createDummySchema("t1"));
+ addSchema(createDummySchema("t2"));
+ addSchema(createDummySchema("t3"));
+
+ // 2. Add a table with the schema name reference
+ addTableConfig(createDummyTableConfig("t1", "t1"));
+ addTableConfig(createDummyTableConfig("t2", "t2"));
+ addTableConfig(createDummyTableConfig("t3", "t3"));
+
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1",
"t2"));
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2",
"t3"));
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3",
"t1"));
+
+ // 3. Fix table schema
+ _controllerStarter.fixSchemaNameInTableConfig();
+
+ // 4. validate
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+ // 5. Clean up
+ for (String table : getHelixResourceManager().getAllOfflineTables()) {
+ getHelixResourceManager().deleteOfflineTable(table);
+ }
+ for (String schema : getHelixResourceManager().getSchemaNames()) {
+ getHelixResourceManager().deleteSchema(schema);
+ }
+ }
+
+ @Test
+ public void testSchemaCleanupTaskNormalCase()
+ throws Exception {
+ PinotMetricUtils.cleanUp();
+ // 1. Add a schema
+ addSchema(createDummySchema("t1"));
+ addSchema(createDummySchema("t2"));
+ addSchema(createDummySchema("t3"));
+
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+ // 2. Add a table with the schema name reference
+ addTableConfig(createDummyTableConfig("t1", "t1"));
+ addTableConfig(createDummyTableConfig("t2", "t2"));
+ addTableConfig(createDummyTableConfig("t3", "t3"));
+
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+
+ // 3. Create new schemas and update table to new schema
+ addSchema(createDummySchema("t11"));
+ addSchema(createDummySchema("t21"));
+ addSchema(createDummySchema("t31"));
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1",
"t11"));
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2",
"t21"));
+ _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3",
"t31"));
+
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
+
assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(),
"t11");
+
assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(),
"t21");
+
assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(),
"t31");
+
+ // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed
later.
+ deleteSchema("t1");
+ deleteSchema("t2");
+ deleteSchema("t3");
+
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+ // 5. Fix table schema
+ _controllerStarter.fixSchemaNameInTableConfig();
+
+ // 6. All tables will directly set schema.
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
+ assertTrue(getHelixResourceManager().getSchemaNames().contains("t1"));
+ assertTrue(getHelixResourceManager().getSchemaNames().contains("t2"));
+ assertTrue(getHelixResourceManager().getSchemaNames().contains("t3"));
+
+
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+ // 7. Clean up
+ for (String table : getHelixResourceManager().getAllOfflineTables()) {
+ getHelixResourceManager().deleteOfflineTable(table);
+ }
+ for (String schema : getHelixResourceManager().getSchemaNames()) {
+ getHelixResourceManager().deleteSchema(schema);
+ }
+ }
+
+ @Test
+ public void testMissingSchema()
+ throws Exception {
+ PinotMetricUtils.cleanUp();
+ // 1. Add a schema
+ addSchema(createDummySchema("t1"));
+ addSchema(createDummySchema("t2"));
+ addSchema(createDummySchema("t3"));
+
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+ // 2. Add a table with the schema name reference
+ addTableConfig(createDummyTableConfig("t1"));
+ addTableConfig(createDummyTableConfig("t2"));
+ addTableConfig(createDummyTableConfig("t3"));
+
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+
+ // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed
later.
+ deleteSchema("t1");
+ deleteSchema("t2");
+ deleteSchema("t3");
+
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
+
+ // 5. Fix table schema
+ _controllerStarter.fixSchemaNameInTableConfig();
+
+ // 6. We cannot fix schema
+ assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+ assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
+
+
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+ ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+ // 7. Clean up
+ for (String table : getHelixResourceManager().getAllOfflineTables()) {
+ getHelixResourceManager().deleteOfflineTable(table);
+ }
+ for (String schema : getHelixResourceManager().getSchemaNames()) {
+ getHelixResourceManager().deleteSchema(schema);
+ }
+ }
+
+ private TableConfig createDummyTableConfig(String table) {
+ return new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(table)
+ .build();
+ }
+
+ private TableConfig createDummyTableConfig(String table, String schema) {
+ TableConfig tableConfig = createDummyTableConfig(table);
+ tableConfig.getValidationConfig().setSchemaName(schema);
+ return tableConfig;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]