This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 63f7ad9  Enhance TableCache to support schema name different from 
table name (#7525)
63f7ad9 is described below

commit 63f7ad9b81688e31d0a047cd8b2aaf6a58c50bf3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 6 12:02:46 2021 -0700

    Enhance TableCache to support schema name different from table name (#7525)
    
    Enhance the TableCache to support the case where table schema name is not 
the same as the raw table name.
    We recommend configuring the schema name the same as the raw table name, 
but it is not enforced. This PR fixes the issue of not able to find the schema 
from TableCache when its name is not the same as the raw table name. Query 
optimizers rely on the schema from the TableCache to optimize the query.
---
 .../pinot/common/utils/helix/TableCache.java       |  76 ++++++++----
 .../pinot/controller/helix/TableCacheTest.java     | 130 ++++++++++++---------
 2 files changed, 128 insertions(+), 78 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4709cca..58ca9ee 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -56,14 +56,19 @@ public class TableCache {
 
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final boolean _caseInsensitive;
-  // For case insensitive, key is lower case table name, value is actual table 
name
-  private final Map<String, String> _tableNameMap;
 
-  // Key is table name with type suffix
   private final TableConfigChangeListener _tableConfigChangeListener = new 
TableConfigChangeListener();
+  // Key is table name with type suffix, value is table config
   private final Map<String, TableConfig> _tableConfigMap = new 
ConcurrentHashMap<>();
-  // Key is raw table name
+  // Key is table name (with or without type suffix), value is schema name
+  // It only stores table with schema name not matching the raw table name
+  private final Map<String, String> _schemaNameMap = new ConcurrentHashMap<>();
+  // Key is lower case table name (with or without type suffix), value is 
actual table name
+  // For case-insensitive mode only
+  private final Map<String, String> _tableNameMap;
+
   private final SchemaChangeListener _schemaChangeListener = new 
SchemaChangeListener();
+  // Key is schema name, value is schema info
   private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
   public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
caseInsensitive) {
@@ -126,7 +131,8 @@ public class TableCache {
   @Nullable
   public Map<String, String> getColumnNameMap(String rawTableName) {
     Preconditions.checkState(_caseInsensitive, "TableCache is not 
case-insensitive");
-    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    String schemaName = _schemaNameMap.getOrDefault(rawTableName, 
rawTableName);
+    SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
     return schemaInfo != null ? schemaInfo._columnNameMap : null;
   }
 
@@ -143,7 +149,8 @@ public class TableCache {
    */
   @Nullable
   public Schema getSchema(String rawTableName) {
-    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    String schemaName = _schemaNameMap.getOrDefault(rawTableName, 
rawTableName);
+    SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
     return schemaInfo != null ? schemaInfo._schema : null;
   }
 
@@ -169,9 +176,18 @@ public class TableCache {
     TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
     String tableNameWithType = tableConfig.getTableName();
     _tableConfigMap.put(tableNameWithType, tableConfig);
+
+    String schemaName = tableConfig.getValidationConfig().getSchemaName();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    if (schemaName != null && !schemaName.equals(rawTableName)) {
+      _schemaNameMap.put(tableNameWithType, schemaName);
+      _schemaNameMap.put(rawTableName, schemaName);
+    } else {
+      removeSchemaName(tableNameWithType);
+    }
+
     if (_caseInsensitive) {
       _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
       _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
     }
   }
@@ -180,6 +196,7 @@ public class TableCache {
     _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
     String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
     _tableConfigMap.remove(tableNameWithType);
+    removeSchemaName(tableNameWithType);
     if (_caseInsensitive) {
       _tableNameMap.remove(tableNameWithType.toLowerCase());
       String lowerCaseRawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
@@ -195,6 +212,21 @@ public class TableCache {
     }
   }
 
+  private void removeSchemaName(String tableNameWithType) {
+    if (_schemaNameMap.remove(tableNameWithType) != null) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if 
(!_schemaNameMap.containsKey(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)))
 {
+          _schemaNameMap.remove(rawTableName);
+        }
+      } else {
+        if 
(!_schemaNameMap.containsKey(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)))
 {
+          _schemaNameMap.remove(rawTableName);
+        }
+      }
+    }
+  }
+
   private void addSchemas(List<String> paths) {
     // Subscribe data changes before reading the data to avoid missing changes
     for (String path : paths) {
@@ -215,35 +247,35 @@ public class TableCache {
   private void putSchema(ZNRecord znRecord)
       throws IOException {
     Schema schema = SchemaUtils.fromZNRecord(znRecord);
-    String rawTableName = schema.getSchemaName();
+    String schemaName = schema.getSchemaName();
     if (_caseInsensitive) {
       Map<String, String> columnNameMap = new HashMap<>();
       for (String columnName : schema.getColumnNames()) {
         columnNameMap.put(columnName.toLowerCase(), columnName);
       }
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+      _schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
     } else {
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+      _schemaInfoMap.put(schemaName, new SchemaInfo(schema, null));
     }
   }
 
   private void removeSchema(String path) {
     _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
-    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
-    _schemaInfoMap.remove(rawTableName);
+    String schemaName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(schemaName);
   }
 
   private class TableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
-    public synchronized void handleChildChange(String path, List<String> 
tables) {
-      if (CollectionUtils.isEmpty(tables)) {
+    public synchronized void handleChildChange(String path, List<String> 
tableNamesWithType) {
+      if (CollectionUtils.isEmpty(tableNamesWithType)) {
         return;
       }
 
       // Only process new added table configs. Changed/removed table configs 
are handled by other callbacks.
       List<String> pathsToAdd = new ArrayList<>();
-      for (String tableNameWithType : tables) {
+      for (String tableNameWithType : tableNamesWithType) {
         if (!_tableConfigMap.containsKey(tableNameWithType)) {
           pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
         }
@@ -276,16 +308,16 @@ public class TableCache {
   private class SchemaChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
-    public synchronized void handleChildChange(String path, List<String> 
tables) {
-      if (CollectionUtils.isEmpty(tables)) {
+    public synchronized void handleChildChange(String path, List<String> 
schemaNames) {
+      if (CollectionUtils.isEmpty(schemaNames)) {
         return;
       }
 
       // Only process new added schemas. Changed/removed schemas are handled 
by other callbacks.
       List<String> pathsToAdd = new ArrayList<>();
-      for (String rawTableName : tables) {
-        if (!_schemaInfoMap.containsKey(rawTableName)) {
-          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+      for (String schemaName : schemaNames) {
+        if (!_schemaInfoMap.containsKey(schemaName)) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + schemaName);
         }
       }
       if (!pathsToAdd.isEmpty()) {
@@ -308,8 +340,8 @@ public class TableCache {
     @Override
     public synchronized void handleDataDeleted(String path) {
       // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
-      String rawTableName = path.substring(path.lastIndexOf('/') + 1);
-      removeSchema(SCHEMA_PATH_PREFIX + rawTableName);
+      String schemaName = path.substring(path.lastIndexOf('/') + 1);
+      removeSchema(SCHEMA_PATH_PREFIX + schemaName);
     }
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index fcdc6fe..5306bc1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -40,13 +41,13 @@ import static org.testng.Assert.assertNull;
 
 
 public class TableCacheTest {
+  private static final String SCHEMA_NAME = "testSchema";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
-  private static final String TABLE_NAME = "cacheTable";
-  private static final String OFFLINE_TABLE_NAME = TABLE_NAME + "_OFFLINE";
-  private static final String REALTIME_TABLE_NAME = TABLE_NAME + "_REALTIME";
-
-  private static final String MANGLED_TABLE_NAME = "cAcHeTaBlE";
-  private static final String MANGLED_OFFLINE_TABLE_NAME = MANGLED_TABLE_NAME 
+ "_oFfLiNe";
+  private static final String MANGLED_RAW_TABLE_NAME = "TeStTaBlE";
+  private static final String MANGLED_OFFLINE_TABLE_NAME = 
MANGLED_RAW_TABLE_NAME + "_oFfLiNe";
 
   @BeforeClass
   public void setUp()
@@ -59,84 +60,101 @@ public class TableCacheTest {
       throws Exception {
     TableCache tableCache = new 
TableCache(ControllerTestUtils.getPropertyStore(), true);
 
-    assertNull(tableCache.getActualTableName(TABLE_NAME));
-    assertNull(tableCache.getColumnNameMap(TABLE_NAME));
+    assertNull(tableCache.getSchema(SCHEMA_NAME));
+    assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
     assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
-    assertNull(tableCache.getSchema(TABLE_NAME));
+    assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+
+    // Add a schema
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(SCHEMA_NAME).addSingleValueDimension("testColumn",
 DataType.INT)
+            .build();
+    ControllerTestUtils.getHelixResourceManager().addSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to add the schema to the 
cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) != 
null, 10_000L,
+        "Failed to add the schema to the cache");
+    // Schema can be accessed by the schema name, but not by the table name 
because table config is not added yet
+    Map<String, String> expectedColumnMap = 
Collections.singletonMap("testcolumn", "testColumn");
+    assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
+    // Case-insensitive table name are handled based on the table config 
instead of the schema
+    assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
 
     // Add a table config
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSchemaName(SCHEMA_NAME).build();
     ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
     // Wait for at most 10 seconds for the callback to add the table config to 
the cache
     TestUtils.waitForCondition(aVoid -> 
tableCache.getTableConfig(OFFLINE_TABLE_NAME) != null, 10_000L,
         "Failed to add the table config to the cache");
-    assertEquals(tableCache.getActualTableName(MANGLED_TABLE_NAME), 
TABLE_NAME);
+    assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
+    assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME), 
RAW_TABLE_NAME);
     assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME), 
OFFLINE_TABLE_NAME);
     assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
-    assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
-    assertNull(tableCache.getColumnNameMap(TABLE_NAME));
-    assertNull(tableCache.getSchema(TABLE_NAME));
+    // Schema can be accessed by both the schema name and the raw table name
+    assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+    assertEquals(tableCache.getSchema(RAW_TABLE_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
+
+    // Update the schema
+    schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
+    ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to update the schema in 
the cache
+    // NOTE: schema should never be null during the transitioning
+    TestUtils.waitForCondition(aVoid -> 
Preconditions.checkNotNull(tableCache.getSchema(SCHEMA_NAME)).equals(schema),
+        10_000L, "Failed to update the schema in the cache");
+    // Schema can be accessed by both the schema name and the raw table name
+    expectedColumnMap = new HashMap<>();
+    expectedColumnMap.put("testcolumn", "testColumn");
+    expectedColumnMap.put("newcolumn", "newColumn");
+    assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+    assertEquals(tableCache.getSchema(RAW_TABLE_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
-    // Update the table config
-    
tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
+    // Update the table config and drop the schema name
+    tableConfig.getValidationConfig().setSchemaName(null);
     
ControllerTestUtils.getHelixResourceManager().updateTableConfig(tableConfig);
     // Wait for at most 10 seconds for the callback to update the table config 
in the cache
     // NOTE: Table config should never be null during the transitioning
     TestUtils.waitForCondition(
         aVoid -> 
Preconditions.checkNotNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)).equals(tableConfig),
 10_000L,
         "Failed to update the table config in the cache");
-    assertEquals(tableCache.getActualTableName(MANGLED_TABLE_NAME), 
TABLE_NAME);
+    assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME), 
RAW_TABLE_NAME);
     assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME), 
OFFLINE_TABLE_NAME);
     assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
-    assertNull(tableCache.getColumnNameMap(TABLE_NAME));
-    assertNull(tableCache.getSchema(TABLE_NAME));
+    // After dropping the schema name from the table config, schema can only 
be accessed by the schema name, but not by
+    // the table name
+    assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
 
     // Remove the table config
-    
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
+    
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
     // Wait for at most 10 seconds for the callback to remove the table config 
from the cache
     TestUtils.waitForCondition(aVoid -> 
tableCache.getTableConfig(OFFLINE_TABLE_NAME) == null, 10_000L,
         "Failed to remove the table config from the cache");
-    assertNull(tableCache.getActualTableName(TABLE_NAME));
-    assertNull(tableCache.getColumnNameMap(TABLE_NAME));
-    assertNull(tableCache.getSchema(TABLE_NAME));
-
-    // Add a schema
-    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-        .addSingleValueDimension("testColumn", DataType.INT)
-        .build();
-    ControllerTestUtils.getHelixResourceManager().addSchema(schema, false);
-    // Wait for at most 10 seconds for the callback to add the schema to the 
cache
-    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(TABLE_NAME) != 
null, 10_000L,
-        "Failed to add the schema to the cache");
-    assertEquals(tableCache.getColumnNameMap(TABLE_NAME), 
Collections.singletonMap("testcolumn", "testColumn"));
-    assertEquals(tableCache.getSchema(TABLE_NAME), schema);
-    // Case-insensitive table name are handled based on the table config 
instead of the schema
-    assertNull(tableCache.getActualTableName(TABLE_NAME));
-    assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
-
-    // Update the schema
-    schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
-    ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
-    // Wait for at most 10 seconds for the callback to update the schema in 
the cache
-    // NOTE: schema should never be null during the transitioning
-    TestUtils.waitForCondition(aVoid -> 
Preconditions.checkNotNull(tableCache.getSchema(TABLE_NAME)).equals(schema),
-        10_000L, "Failed to update the schema in the cache");
-    Map<String, String> expectedColumnMap = new HashMap<>();
-    expectedColumnMap.put("testcolumn", "testColumn");
-    expectedColumnMap.put("newcolumn", "newColumn");
-    assertEquals(tableCache.getColumnNameMap(TABLE_NAME), expectedColumnMap);
-    // Case-insensitive table name are handled based on the table config 
instead of the schema
-    assertNull(tableCache.getActualTableName(TABLE_NAME));
-    assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+    assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+    // After dropping the table config, schema can only be accessed by the 
schema name, but not by the table name
+    assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+    assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
 
     // Remove the schema
     ControllerTestUtils.getHelixResourceManager().deleteSchema(schema);
     // Wait for at most 10 seconds for the callback to remove the schema from 
the cache
-    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(TABLE_NAME) == 
null, 10_000L,
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) == 
null, 10_000L,
         "Failed to remove the schema from the cache");
-    assertNull(tableCache.getActualTableName(TABLE_NAME));
-    assertNull(tableCache.getColumnNameMap(TABLE_NAME));
-    assertNull(tableCache.getSchema(TABLE_NAME));
+    assertNull(tableCache.getSchema(SCHEMA_NAME));
+    assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
   }
 
   @AfterClass

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to