This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 169aba9fc9 Fix schema name in table config during controller startup 
(#11574)
169aba9fc9 is described below

commit 169aba9fc9dded326c1ed9cb31cd7ad6e1752089
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Oct 10 17:55:25 2023 -0700

    Fix schema name in table config during controller startup (#11574)
    
    * Adding a controller task to update schema name
    
    * Misc fixes
    
    * Fix linter
    
    ---------
    
    Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
---
 .../pinot/common/metrics/ControllerGauge.java      |  17 +-
 .../pinot/controller/BaseControllerStarter.java    | 117 +++++++++
 .../cleanup/SchemaCleanupTaskStatelessTest.java    | 285 +++++++++++++++++++++
 3 files changed, 418 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 64ced0203c..887b342110 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -132,7 +132,22 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   MAX_RECORDS_LAG("maxRecordsLag", false),
 
   // Consumption availability lag in ms at a partition level
-  MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false);
+  MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false),
+
+  // Number of table schema got misconfigured
+  MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true),
+
+  // Number of table without schema
+  TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true),
+
+  // Number of table schema got fixed
+  FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true),
+
+  // Number of tables that we want to fix but failed to copy schema from old 
schema name to new schema name
+  FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true),
+
+  // Number of tables that we want to fix but failed to update table config
+  FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 240e8d662b..3b2565f020 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -35,8 +35,10 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -47,13 +49,16 @@ import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.function.FunctionRegistry;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -62,9 +67,11 @@ import 
org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache;
 import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
 import org.apache.pinot.common.minion.TaskManagerStatusCache;
 import org.apache.pinot.common.utils.PinotAppConfigs;
+import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
@@ -105,6 +112,7 @@ import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -117,7 +125,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.apache.zookeeper.data.Stat;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -525,6 +535,10 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       throw new RuntimeException("Unable to start controller due to existing 
HLC tables!");
     }
 
+    // One time job to fix schema name in all tables
+    // This method can be removed after the next major release.
+    fixSchemaNameInTableConfig();
+
     _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new 
File(_config.getDataDir()).exists() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
       File dataDir = new File(_config.getDataDir());
@@ -549,6 +563,109 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
   }
 
+  /**
+   * This method is used to fix table/schema names.
+   * TODO: in the next release, maybe 2.0.0, we can remove this method. 
Meanwhile we can delete the orphan schemas
+   * that has been existed longer than a certain time period.
+   *
+   */
+  @VisibleForTesting
+  public void fixSchemaNameInTableConfig() {
+    AtomicInteger misconfiguredTableCount = new AtomicInteger();
+    AtomicInteger tableWithoutSchemaCount = new AtomicInteger();
+    AtomicInteger fixedSchemaTableCount = new AtomicInteger();
+    AtomicInteger failedToCopySchemaCount = new AtomicInteger();
+    AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixResourceManager.getPropertyStore();
+
+    List<String> allTables = _helixResourceManager.getAllTables();
+    allTables.forEach(tableNameWithType -> {
+      String tableConfigPath = 
ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
+      Stat tableConfigStat = new Stat();
+      ZNRecord tableConfigZNRecord = propertyStore.get(tableConfigPath, 
tableConfigStat, AccessOption.PERSISTENT);
+      if (tableConfigZNRecord == null) {
+        // This might due to table deletion, just log it here.
+        LOGGER.warn("Failed to find table config for table: {}, the table 
likely already got deleted",
+            tableNameWithType);
+        return;
+      }
+      TableConfig tableConfig;
+      try {
+        tableConfig = TableConfigUtils.fromZNRecord(tableConfigZNRecord);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception constructing table config from ZNRecord 
for table: {}", tableNameWithType, e);
+        return;
+      }
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      String schemaPath = 
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
+      boolean schemaExists = propertyStore.exists(schemaPath, 
AccessOption.PERSISTENT);
+      String existSchemaName = 
tableConfig.getValidationConfig().getSchemaName();
+      if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
+        // Although the table config is valid, we still need to ensure the 
schema exists
+        if (!schemaExists) {
+          LOGGER.warn("Failed to find schema for table: {}", 
tableNameWithType);
+          tableWithoutSchemaCount.getAndIncrement();
+          return;
+        }
+        // Table config is already in good status
+        return;
+      }
+      misconfiguredTableCount.getAndIncrement();
+      if (schemaExists) {
+        // If a schema named `rawTableName` already exists, then likely this 
is a misconfiguration.
+        // Reset schema name in table config to null to let the table point to 
the existing schema.
+        LOGGER.warn("Schema: {} already exists, fix the schema name in table 
config from {} to null", rawTableName,
+            existSchemaName);
+      } else {
+        // Copy the schema current table referring to to `rawTableName` if it 
does not exist
+        Schema schema = _helixResourceManager.getSchema(existSchemaName);
+        if (schema == null) {
+          LOGGER.warn("Failed to find schema: {} for table: {}", 
existSchemaName, tableNameWithType);
+          tableWithoutSchemaCount.getAndIncrement();
+          return;
+        }
+        schema.setSchemaName(rawTableName);
+        if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), 
AccessOption.PERSISTENT)) {
+          LOGGER.info("Copied schema: {} to {}", existSchemaName, 
rawTableName);
+        } else {
+          LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, 
rawTableName);
+          failedToCopySchemaCount.getAndIncrement();
+          return;
+        }
+      }
+      // Update table config to remove schema name
+      tableConfig.getValidationConfig().setSchemaName(null);
+      try {
+        tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception constructing ZNRecord from table config 
for table: {}", tableNameWithType, e);
+        return;
+      }
+      if (propertyStore.set(tableConfigPath, tableConfigZNRecord, 
tableConfigStat.getVersion(),
+          AccessOption.PERSISTENT)) {
+        LOGGER.info("Removed schema name from table config for table: {}", 
tableNameWithType);
+        fixedSchemaTableCount.getAndIncrement();
+      } else {
+        LOGGER.warn("Failed to update table config for table: {}", 
tableNameWithType);
+        failedToUpdateTableConfigCount.getAndIncrement();
+      }
+    });
+    LOGGER.info(
+        "Found {} tables misconfigured, {} tables without schema. Successfully 
fixed schema for {} tables, failed to "
+            + "fix {} tables due to copy schema failure, failed to fix {} 
tables due to update table config failure.",
+        misconfiguredTableCount.get(), tableWithoutSchemaCount.get(), 
fixedSchemaTableCount.get(),
+        failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get());
+
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT,
+        misconfiguredTableCount.get());
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
 tableWithoutSchemaCount.get());
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT,
 fixedSchemaTableCount.get());
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT,
+        failedToCopySchemaCount.get());
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT,
+        failedToUpdateTableConfigCount.get());
+  }
+
   private ServiceStatus.ServiceStatusCallback 
generateServiceStatusCallback(HelixManager helixManager) {
     return new ServiceStatus.ServiceStatusCallback() {
       private volatile boolean _isStarted = false;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
new file mode 100644
index 0000000000..8d4e2077f0
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.cleanup;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.MetricValueUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test can be deleted once {@link 
BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0.
+ */
+@Test(groups = "stateless")
+public class SchemaCleanupTaskStatelessTest extends ControllerTest {
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    startZk();
+    startController();
+    startFakeBroker();
+    startFakeServer();
+  }
+
+  private void startFakeBroker()
+      throws Exception {
+    String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + 
NetUtils.getHostAddress() + "_"
+        + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
+
+    // Create server instance with the fake server state model
+    HelixManager brokerHelixManager =
+        HelixManagerFactory.getZKHelixManager(getHelixClusterName(), 
brokerInstance, InstanceType.PARTICIPANT,
+            getZkUrl());
+    brokerHelixManager.connect();
+
+    // Add Helix tag to the server
+    
brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
 brokerInstance,
+        TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+  }
+
+  private void startFakeServer()
+      throws Exception {
+    String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + 
NetUtils.getHostAddress() + "_"
+        + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+
+    // Create server instance with the fake server state model
+    HelixManager serverHelixManager = HelixManagerFactory
+        .getZKHelixManager(getHelixClusterName(), serverInstance, 
InstanceType.PARTICIPANT, getZkUrl());
+    serverHelixManager.connect();
+
+    // Add Helix tag to the server
+    
serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
 serverInstance,
+        
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+  }
+
+  @AfterClass
+  public void teardown() {
+    stopController();
+    stopZk();
+  }
+
+  @Test
+  public void testSchemaCleanupTask()
+      throws Exception {
+    PinotMetricUtils.cleanUp();
+    // 1. Add a schema
+    addSchema(createDummySchema("t1"));
+    addSchema(createDummySchema("t2"));
+    addSchema(createDummySchema("t3"));
+
+    // 2. Add a table with the schema name reference
+    addTableConfig(createDummyTableConfig("t1", "t1"));
+    addTableConfig(createDummyTableConfig("t2", "t2"));
+    addTableConfig(createDummyTableConfig("t3", "t3"));
+
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", 
"t2"));
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", 
"t3"));
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", 
"t1"));
+
+    // 3. Fix table schema
+    _controllerStarter.fixSchemaNameInTableConfig();
+
+    // 4. validate
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+    // 5. Clean up
+    for (String table : getHelixResourceManager().getAllOfflineTables()) {
+      getHelixResourceManager().deleteOfflineTable(table);
+    }
+    for (String schema : getHelixResourceManager().getSchemaNames()) {
+      getHelixResourceManager().deleteSchema(schema);
+    }
+  }
+
+  @Test
+  public void testSchemaCleanupTaskNormalCase()
+      throws Exception {
+    PinotMetricUtils.cleanUp();
+    // 1. Add a schema
+    addSchema(createDummySchema("t1"));
+    addSchema(createDummySchema("t2"));
+    addSchema(createDummySchema("t3"));
+
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+    // 2. Add a table with the schema name reference
+    addTableConfig(createDummyTableConfig("t1", "t1"));
+    addTableConfig(createDummyTableConfig("t2", "t2"));
+    addTableConfig(createDummyTableConfig("t3", "t3"));
+
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+
+    // 3. Create new schemas and update table to new schema
+    addSchema(createDummySchema("t11"));
+    addSchema(createDummySchema("t21"));
+    addSchema(createDummySchema("t31"));
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", 
"t11"));
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", 
"t21"));
+    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", 
"t31"));
+
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
+    
assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(),
 "t11");
+    
assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(),
 "t21");
+    
assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(),
 "t31");
+
+    // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed 
later.
+    deleteSchema("t1");
+    deleteSchema("t2");
+    deleteSchema("t3");
+
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+    // 5. Fix table schema
+    _controllerStarter.fixSchemaNameInTableConfig();
+
+    // 6. All tables will directly set schema.
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
+    assertTrue(getHelixResourceManager().getSchemaNames().contains("t1"));
+    assertTrue(getHelixResourceManager().getSchemaNames().contains("t2"));
+    assertTrue(getHelixResourceManager().getSchemaNames().contains("t3"));
+
+    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+    // 7. Clean up
+    for (String table : getHelixResourceManager().getAllOfflineTables()) {
+      getHelixResourceManager().deleteOfflineTable(table);
+    }
+    for (String schema : getHelixResourceManager().getSchemaNames()) {
+      getHelixResourceManager().deleteSchema(schema);
+    }
+  }
+
+  @Test
+  public void testMissingSchema()
+      throws Exception {
+    PinotMetricUtils.cleanUp();
+    // 1. Add a schema
+    addSchema(createDummySchema("t1"));
+    addSchema(createDummySchema("t2"));
+    addSchema(createDummySchema("t3"));
+
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
+
+    // 2. Add a table with the schema name reference
+    addTableConfig(createDummyTableConfig("t1"));
+    addTableConfig(createDummyTableConfig("t2"));
+    addTableConfig(createDummyTableConfig("t3"));
+
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+
+    // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed 
later.
+    deleteSchema("t1");
+    deleteSchema("t2");
+    deleteSchema("t3");
+
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
+
+    // 5. Fix table schema
+    _controllerStarter.fixSchemaNameInTableConfig();
+
+    // 6. We cannot fix schema
+    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
+    assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
+
+    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
+    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
+
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
+    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
+        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
+
+    // 7. Clean up
+    for (String table : getHelixResourceManager().getAllOfflineTables()) {
+      getHelixResourceManager().deleteOfflineTable(table);
+    }
+    for (String schema : getHelixResourceManager().getSchemaNames()) {
+      getHelixResourceManager().deleteSchema(schema);
+    }
+  }
+
+  private TableConfig createDummyTableConfig(String table) {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(table)
+        .build();
+  }
+
+  private TableConfig createDummyTableConfig(String table, String schema) {
+    TableConfig tableConfig = createDummyTableConfig(table);
+    tableConfig.getValidationConfig().setSchemaName(schema);
+    return tableConfig;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to