This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 63f7ad9 Enhance TableCache to support schema name different from
table name (#7525)
63f7ad9 is described below
commit 63f7ad9b81688e31d0a047cd8b2aaf6a58c50bf3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 6 12:02:46 2021 -0700
Enhance TableCache to support schema name different from table name (#7525)
Enhance the TableCache to support the case where table schema name is not
the same as the raw table name.
We recommend configuring the schema name the same as the raw table name,
but it is not enforced. This PR fixes the issue of not able to find the schema
from TableCache when its name is not the same as the raw table name. Query
optimizers rely on the schema from the TableCache to optimize the query.
---
.../pinot/common/utils/helix/TableCache.java | 76 ++++++++----
.../pinot/controller/helix/TableCacheTest.java | 130 ++++++++++++---------
2 files changed, 128 insertions(+), 78 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4709cca..58ca9ee 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -56,14 +56,19 @@ public class TableCache {
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final boolean _caseInsensitive;
- // For case insensitive, key is lower case table name, value is actual table
name
- private final Map<String, String> _tableNameMap;
- // Key is table name with type suffix
private final TableConfigChangeListener _tableConfigChangeListener = new
TableConfigChangeListener();
+ // Key is table name with type suffix, value is table config
private final Map<String, TableConfig> _tableConfigMap = new
ConcurrentHashMap<>();
- // Key is raw table name
+ // Key is table name (with or without type suffix), value is schema name
+ // It only stores table with schema name not matching the raw table name
+ private final Map<String, String> _schemaNameMap = new ConcurrentHashMap<>();
+ // Key is lower case table name (with or without type suffix), value is
actual table name
+ // For case-insensitive mode only
+ private final Map<String, String> _tableNameMap;
+
private final SchemaChangeListener _schemaChangeListener = new
SchemaChangeListener();
+ // Key is schema name, value is schema info
private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
caseInsensitive) {
@@ -126,7 +131,8 @@ public class TableCache {
@Nullable
public Map<String, String> getColumnNameMap(String rawTableName) {
Preconditions.checkState(_caseInsensitive, "TableCache is not
case-insensitive");
- SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ String schemaName = _schemaNameMap.getOrDefault(rawTableName,
rawTableName);
+ SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
return schemaInfo != null ? schemaInfo._columnNameMap : null;
}
@@ -143,7 +149,8 @@ public class TableCache {
*/
@Nullable
public Schema getSchema(String rawTableName) {
- SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ String schemaName = _schemaNameMap.getOrDefault(rawTableName,
rawTableName);
+ SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
return schemaInfo != null ? schemaInfo._schema : null;
}
@@ -169,9 +176,18 @@ public class TableCache {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
String tableNameWithType = tableConfig.getTableName();
_tableConfigMap.put(tableNameWithType, tableConfig);
+
+ String schemaName = tableConfig.getValidationConfig().getSchemaName();
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ if (schemaName != null && !schemaName.equals(rawTableName)) {
+ _schemaNameMap.put(tableNameWithType, schemaName);
+ _schemaNameMap.put(rawTableName, schemaName);
+ } else {
+ removeSchemaName(tableNameWithType);
+ }
+
if (_caseInsensitive) {
_tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
_tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
}
}
@@ -180,6 +196,7 @@ public class TableCache {
_propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
_tableConfigMap.remove(tableNameWithType);
+ removeSchemaName(tableNameWithType);
if (_caseInsensitive) {
_tableNameMap.remove(tableNameWithType.toLowerCase());
String lowerCaseRawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
@@ -195,6 +212,21 @@ public class TableCache {
}
}
+ private void removeSchemaName(String tableNameWithType) {
+ if (_schemaNameMap.remove(tableNameWithType) != null) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ if
(!_schemaNameMap.containsKey(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)))
{
+ _schemaNameMap.remove(rawTableName);
+ }
+ } else {
+ if
(!_schemaNameMap.containsKey(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)))
{
+ _schemaNameMap.remove(rawTableName);
+ }
+ }
+ }
+ }
+
private void addSchemas(List<String> paths) {
// Subscribe data changes before reading the data to avoid missing changes
for (String path : paths) {
@@ -215,35 +247,35 @@ public class TableCache {
private void putSchema(ZNRecord znRecord)
throws IOException {
Schema schema = SchemaUtils.fromZNRecord(znRecord);
- String rawTableName = schema.getSchemaName();
+ String schemaName = schema.getSchemaName();
if (_caseInsensitive) {
Map<String, String> columnNameMap = new HashMap<>();
for (String columnName : schema.getColumnNames()) {
columnNameMap.put(columnName.toLowerCase(), columnName);
}
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+ _schemaInfoMap.put(schemaName, new SchemaInfo(schema, columnNameMap));
} else {
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+ _schemaInfoMap.put(schemaName, new SchemaInfo(schema, null));
}
}
private void removeSchema(String path) {
_propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
- String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
- _schemaInfoMap.remove(rawTableName);
+ String schemaName = path.substring(SCHEMA_PATH_PREFIX.length());
+ _schemaInfoMap.remove(schemaName);
}
private class TableConfigChangeListener implements IZkChildListener,
IZkDataListener {
@Override
- public synchronized void handleChildChange(String path, List<String>
tables) {
- if (CollectionUtils.isEmpty(tables)) {
+ public synchronized void handleChildChange(String path, List<String>
tableNamesWithType) {
+ if (CollectionUtils.isEmpty(tableNamesWithType)) {
return;
}
// Only process new added table configs. Changed/removed table configs
are handled by other callbacks.
List<String> pathsToAdd = new ArrayList<>();
- for (String tableNameWithType : tables) {
+ for (String tableNameWithType : tableNamesWithType) {
if (!_tableConfigMap.containsKey(tableNameWithType)) {
pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
}
@@ -276,16 +308,16 @@ public class TableCache {
private class SchemaChangeListener implements IZkChildListener,
IZkDataListener {
@Override
- public synchronized void handleChildChange(String path, List<String>
tables) {
- if (CollectionUtils.isEmpty(tables)) {
+ public synchronized void handleChildChange(String path, List<String>
schemaNames) {
+ if (CollectionUtils.isEmpty(schemaNames)) {
return;
}
// Only process new added schemas. Changed/removed schemas are handled
by other callbacks.
List<String> pathsToAdd = new ArrayList<>();
- for (String rawTableName : tables) {
- if (!_schemaInfoMap.containsKey(rawTableName)) {
- pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+ for (String schemaName : schemaNames) {
+ if (!_schemaInfoMap.containsKey(schemaName)) {
+ pathsToAdd.add(SCHEMA_PATH_PREFIX + schemaName);
}
}
if (!pathsToAdd.isEmpty()) {
@@ -308,8 +340,8 @@ public class TableCache {
@Override
public synchronized void handleDataDeleted(String path) {
// NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
- String rawTableName = path.substring(path.lastIndexOf('/') + 1);
- removeSchema(SCHEMA_PATH_PREFIX + rawTableName);
+ String schemaName = path.substring(path.lastIndexOf('/') + 1);
+ removeSchema(SCHEMA_PATH_PREFIX + schemaName);
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index fcdc6fe..5306bc1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -40,13 +41,13 @@ import static org.testng.Assert.assertNull;
public class TableCacheTest {
+ private static final String SCHEMA_NAME = "testSchema";
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
- private static final String TABLE_NAME = "cacheTable";
- private static final String OFFLINE_TABLE_NAME = TABLE_NAME + "_OFFLINE";
- private static final String REALTIME_TABLE_NAME = TABLE_NAME + "_REALTIME";
-
- private static final String MANGLED_TABLE_NAME = "cAcHeTaBlE";
- private static final String MANGLED_OFFLINE_TABLE_NAME = MANGLED_TABLE_NAME
+ "_oFfLiNe";
+ private static final String MANGLED_RAW_TABLE_NAME = "TeStTaBlE";
+ private static final String MANGLED_OFFLINE_TABLE_NAME =
MANGLED_RAW_TABLE_NAME + "_oFfLiNe";
@BeforeClass
public void setUp()
@@ -59,84 +60,101 @@ public class TableCacheTest {
throws Exception {
TableCache tableCache = new
TableCache(ControllerTestUtils.getPropertyStore(), true);
- assertNull(tableCache.getActualTableName(TABLE_NAME));
- assertNull(tableCache.getColumnNameMap(TABLE_NAME));
+ assertNull(tableCache.getSchema(SCHEMA_NAME));
+ assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
- assertNull(tableCache.getSchema(TABLE_NAME));
+ assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+
+ // Add a schema
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(SCHEMA_NAME).addSingleValueDimension("testColumn",
DataType.INT)
+ .build();
+ ControllerTestUtils.getHelixResourceManager().addSchema(schema, false);
+ // Wait for at most 10 seconds for the callback to add the schema to the
cache
+ TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) !=
null, 10_000L,
+ "Failed to add the schema to the cache");
+ // Schema can be accessed by the schema name, but not by the table name
because table config is not added yet
+ Map<String, String> expectedColumnMap =
Collections.singletonMap("testcolumn", "testColumn");
+ assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
+ // Case-insensitive table name are handled based on the table config
instead of the schema
+ assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
// Add a table config
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSchemaName(SCHEMA_NAME).build();
ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
// Wait for at most 10 seconds for the callback to add the table config to
the cache
TestUtils.waitForCondition(aVoid ->
tableCache.getTableConfig(OFFLINE_TABLE_NAME) != null, 10_000L,
"Failed to add the table config to the cache");
- assertEquals(tableCache.getActualTableName(MANGLED_TABLE_NAME),
TABLE_NAME);
+ assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
+ assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME),
RAW_TABLE_NAME);
assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME),
OFFLINE_TABLE_NAME);
assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
- assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
- assertNull(tableCache.getColumnNameMap(TABLE_NAME));
- assertNull(tableCache.getSchema(TABLE_NAME));
+ // Schema can be accessed by both the schema name and the raw table name
+ assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+ assertEquals(tableCache.getSchema(RAW_TABLE_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
+
+ // Update the schema
+ schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
+ ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
+ // Wait for at most 10 seconds for the callback to update the schema in
the cache
+ // NOTE: schema should never be null during the transitioning
+ TestUtils.waitForCondition(aVoid ->
Preconditions.checkNotNull(tableCache.getSchema(SCHEMA_NAME)).equals(schema),
+ 10_000L, "Failed to update the schema in the cache");
+ // Schema can be accessed by both the schema name and the raw table name
+ expectedColumnMap = new HashMap<>();
+ expectedColumnMap.put("testcolumn", "testColumn");
+ expectedColumnMap.put("newcolumn", "newColumn");
+ assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+ assertEquals(tableCache.getSchema(RAW_TABLE_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
- // Update the table config
-
tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
+ // Update the table config and drop the schema name
+ tableConfig.getValidationConfig().setSchemaName(null);
ControllerTestUtils.getHelixResourceManager().updateTableConfig(tableConfig);
// Wait for at most 10 seconds for the callback to update the table config
in the cache
// NOTE: Table config should never be null during the transitioning
TestUtils.waitForCondition(
aVoid ->
Preconditions.checkNotNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)).equals(tableConfig),
10_000L,
"Failed to update the table config in the cache");
- assertEquals(tableCache.getActualTableName(MANGLED_TABLE_NAME),
TABLE_NAME);
+ assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME),
RAW_TABLE_NAME);
assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME),
OFFLINE_TABLE_NAME);
assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
- assertNull(tableCache.getColumnNameMap(TABLE_NAME));
- assertNull(tableCache.getSchema(TABLE_NAME));
+ // After dropping the schema name from the table config, schema can only
be accessed by the schema name, but not by
+ // the table name
+ assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
// Remove the table config
-
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(TABLE_NAME);
+
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
// Wait for at most 10 seconds for the callback to remove the table config
from the cache
TestUtils.waitForCondition(aVoid ->
tableCache.getTableConfig(OFFLINE_TABLE_NAME) == null, 10_000L,
"Failed to remove the table config from the cache");
- assertNull(tableCache.getActualTableName(TABLE_NAME));
- assertNull(tableCache.getColumnNameMap(TABLE_NAME));
- assertNull(tableCache.getSchema(TABLE_NAME));
-
- // Add a schema
- Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addSingleValueDimension("testColumn", DataType.INT)
- .build();
- ControllerTestUtils.getHelixResourceManager().addSchema(schema, false);
- // Wait for at most 10 seconds for the callback to add the schema to the
cache
- TestUtils.waitForCondition(aVoid -> tableCache.getSchema(TABLE_NAME) !=
null, 10_000L,
- "Failed to add the schema to the cache");
- assertEquals(tableCache.getColumnNameMap(TABLE_NAME),
Collections.singletonMap("testcolumn", "testColumn"));
- assertEquals(tableCache.getSchema(TABLE_NAME), schema);
- // Case-insensitive table name are handled based on the table config
instead of the schema
- assertNull(tableCache.getActualTableName(TABLE_NAME));
- assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
-
- // Update the schema
- schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
- ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
- // Wait for at most 10 seconds for the callback to update the schema in
the cache
- // NOTE: schema should never be null during the transitioning
- TestUtils.waitForCondition(aVoid ->
Preconditions.checkNotNull(tableCache.getSchema(TABLE_NAME)).equals(schema),
- 10_000L, "Failed to update the schema in the cache");
- Map<String, String> expectedColumnMap = new HashMap<>();
- expectedColumnMap.put("testcolumn", "testColumn");
- expectedColumnMap.put("newcolumn", "newColumn");
- assertEquals(tableCache.getColumnNameMap(TABLE_NAME), expectedColumnMap);
- // Case-insensitive table name are handled based on the table config
instead of the schema
- assertNull(tableCache.getActualTableName(TABLE_NAME));
- assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+ assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+ // After dropping the table config, schema can only be accessed by the
schema name, but not by the table name
+ assertEquals(tableCache.getSchema(SCHEMA_NAME), schema);
+ assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
// Remove the schema
ControllerTestUtils.getHelixResourceManager().deleteSchema(schema);
// Wait for at most 10 seconds for the callback to remove the schema from
the cache
- TestUtils.waitForCondition(aVoid -> tableCache.getSchema(TABLE_NAME) ==
null, 10_000L,
+ TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) ==
null, 10_000L,
"Failed to remove the schema from the cache");
- assertNull(tableCache.getActualTableName(TABLE_NAME));
- assertNull(tableCache.getColumnNameMap(TABLE_NAME));
- assertNull(tableCache.getSchema(TABLE_NAME));
+ assertNull(tableCache.getSchema(SCHEMA_NAME));
+ assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]