kishoreg commented on a change in pull request #5780:
URL: https://github.com/apache/incubator-pinot/pull/5780#discussion_r463424152
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
##########
@@ -18,190 +18,308 @@
*/
package org.apache.pinot.common.utils.helix;
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Caches table config and schema of a table.
- * At the start - loads all the table configs and schemas in map.
- * sets up a zookeeper listener that watches for any change and updates the
cache.
- * TODO: optimize to load only changed table configs/schema on a callback.
- * TODO: Table deletes are not handled as of now
- * Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the
cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map
for case-insensitive queries.
*/
public class TableCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableCache.class);
+ private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+ private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+ private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+ private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+ private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+ private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
- private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
- private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private final boolean _caseInsensitive;
+ // For case insensitive, key is lower case table name, value is actual table
name
+ private final Map<String, String> _tableNameMap;
- private ZkHelixPropertyStore<ZNRecord> _propertyStore;
- TableConfigChangeListener _tableConfigChangeListener;
- SchemaChangeListener _schemaChangeListener;
+ // Key is table name with type suffix
+ private final TableConfigChangeListener _tableConfigChangeListener = new
TableConfigChangeListener();
+ private final Map<String, TableConfig> _tableConfigMap = new
ConcurrentHashMap<>();
+ // Key is raw table name
+ private final SchemaChangeListener _schemaChangeListener = new
SchemaChangeListener();
+ private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
- public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
caseInsensitive) {
_propertyStore = propertyStore;
- _schemaChangeListener = new SchemaChangeListener();
- _schemaChangeListener.refresh();
- _tableConfigChangeListener = new TableConfigChangeListener();
- _tableConfigChangeListener.refresh();
+ _caseInsensitive = caseInsensitive;
+ _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+ synchronized (_tableConfigChangeListener) {
+ // Subscribe child changes before reading the data to avoid missing
changes
+ _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH,
_tableConfigChangeListener);
+
+ List<String> tables =
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+ if (CollectionUtils.isNotEmpty(tables)) {
+ List<String> pathsToAdd = new ArrayList<>(tables.size());
+ for (String tableNameWithType : tables) {
+ pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+ }
+ addTableConfigs(pathsToAdd);
+ }
+ }
+
+ synchronized (_schemaChangeListener) {
+ // Subscribe child changes before reading the data to avoid missing
changes
+ _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH,
_schemaChangeListener);
+
+ List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH,
AccessOption.PERSISTENT);
+ if (CollectionUtils.isNotEmpty(tables)) {
+ List<String> pathsToAdd = new ArrayList<>(tables.size());
+ for (String rawTableName : tables) {
+ pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+ }
+ addSchemas(pathsToAdd);
+ }
+ }
+
+ LOGGER.info("Initialized TableCache with caseInsensitive: {}",
caseInsensitive);
}
- public String getActualTableName(String tableName) {
- return
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(),
tableName);
+ /**
+ * Returns {@code true} if the TableCache is case-insensitive, {@code false}
otherwise.
+ */
+ public boolean isCaseInsensitive() {
+ return _caseInsensitive;
}
- public boolean containsTable(String tableName) {
- return
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+ /**
+ * For case-insensitive only, returns the actual table name for the given
case-insensitive table name (with or without
+ * type suffix), or {@code null} if the table does not exist.
+ */
+ @Nullable
+ public String getActualTableName(String caseInsensitiveTableName) {
+ Preconditions.checkState(_caseInsensitive, "TableCache is not
case-insensitive");
+ return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
}
- public String getActualColumnName(String tableName, String columnName) {
- String schemaName =
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
- if (schemaName != null) {
- String actualColumnName =
_schemaChangeListener.getColumnName(schemaName, columnName);
- // If actual column name doesn't exist in schema, then return the origin
column name.
- if (actualColumnName == null) {
- return columnName;
+ /**
+ * For case-insensitive only, returns a map from lower case column name to
actual column name for the given table, or
+ * {@code null} if the table schema does not exist.
+ */
+ @Nullable
+ public Map<String, String> getColumnNameMap(String rawTableName) {
+ Preconditions.checkState(_caseInsensitive, "TableCache is not
case-insensitive");
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._columnNameMap : null;
+ }
+
+ /**
+ * Returns the table config for the given table, or {@code null} if it does
not exist.
+ */
+ @Nullable
+ public TableConfig getTableConfig(String tableNameWithType) {
+ return _tableConfigMap.get(tableNameWithType);
+ }
+
+ /**
+ * Returns the schema for the given table, or {@code null} if it does not
exist.
+ */
+ @Nullable
+ public Schema getSchema(String rawTableName) {
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._schema : null;
+ }
+
+ private void addTableConfigs(List<String> paths) {
+ // Subscribe data changes before reading the data to avoid missing changes
+ for (String path : paths) {
+ _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
+ for (ZNRecord znRecord : znRecords) {
+ if (znRecord != null) {
+ try {
+ putTableConfig(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while adding table config for
ZNRecord: {}", znRecord.getId(), e);
+ }
}
- return actualColumnName;
}
- return columnName;
}
- public TableConfig getTableConfig(String tableName) {
- return _tableConfigChangeListener._tableConfigMap.get(tableName);
+ private void putTableConfig(ZNRecord znRecord)
+ throws IOException {
+ TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+ String tableNameWithType = tableConfig.getTableName();
+ _tableConfigMap.put(tableNameWithType, tableConfig);
+ if (_caseInsensitive) {
+ _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+ }
}
- class TableConfigChangeListener implements IZkChildListener, IZkDataListener
{
-
- Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
- Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
- Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
- public synchronized void refresh() {
- try {
- //always subscribe first before reading, so that we dont miss any
changes
-
_propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
_tableConfigChangeListener);
-
_propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
_tableConfigChangeListener);
- List<ZNRecord> children =
- _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
null, AccessOption.PERSISTENT);
- if (children != null) {
- for (ZNRecord znRecord : children) {
- try {
- TableConfig tableConfig =
TableConfigUtils.fromZNRecord(znRecord);
- String tableNameWithType = tableConfig.getTableName();
- _tableConfigMap.put(tableNameWithType, tableConfig);
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- //create case insensitive mapping
- _tableNameMap.put(tableNameWithType.toLowerCase(),
tableNameWithType);
- _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
- //create case insensitive mapping between table name and
schemaName
- _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(),
rawTableName);
- _table2SchemaConfigMap.put(rawTableName.toLowerCase(),
rawTableName);
- } catch (Exception e) {
- LOGGER.warn("Exception loading table config for: {}: {}",
znRecord.getId(), e.getMessage());
- //ignore
- }
- }
+ private void removeTableConfig(String path) {
+ _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+ String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+ _tableConfigMap.remove(tableNameWithType);
+ if (_caseInsensitive) {
+ _tableNameMap.remove(tableNameWithType.toLowerCase());
+ String lowerCaseRawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+ _tableNameMap.remove(lowerCaseRawTableName);
+ }
+ } else {
+ if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+ _tableNameMap.remove(lowerCaseRawTableName);
}
- } catch (Exception e) {
- LOGGER.warn("Exception subscribing/reading tableconfigs", e);
- //ignore
}
}
+ }
+
+ private void addSchemas(List<String> paths) {
+ // Subscribe data changes before reading the data to avoid missing changes
+ for (String path : paths) {
+ _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
+ for (ZNRecord znRecord : znRecords) {
+ if (znRecord != null) {
+ try {
+ putSchema(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while adding schema for ZNRecord:
{}", znRecord.getId(), e);
+ }
+ }
+ }
+ }
+
+ private void putSchema(ZNRecord znRecord)
+ throws IOException {
+ Schema schema = SchemaUtils.fromZNRecord(znRecord);
+ String rawTableName = schema.getSchemaName();
+ if (_caseInsensitive) {
+ Map<String, String> columnNameMap = new HashMap<>();
+ for (String columnName : schema.getColumnNames()) {
+ columnNameMap.put(columnName.toLowerCase(), columnName);
+ }
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+ } else {
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+ }
+ }
+
+ private void removeSchema(String path) {
+ _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+ String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+ _schemaInfoMap.remove(rawTableName);
+ }
+
+ private class TableConfigChangeListener implements IZkChildListener,
IZkDataListener {
@Override
- public void handleChildChange(String s, List<String> list)
- throws Exception {
- refresh();
+ public synchronized void handleChildChange(String path, List<String>
tables) {
Review comment:
this will miss watches if there was a new table added while processing
the callback right?
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
##########
@@ -18,190 +18,308 @@
*/
package org.apache.pinot.common.utils.helix;
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Caches table config and schema of a table.
- * At the start - loads all the table configs and schemas in map.
- * sets up a zookeeper listener that watches for any change and updates the
cache.
- * TODO: optimize to load only changed table configs/schema on a callback.
- * TODO: Table deletes are not handled as of now
- * Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the
cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map
for case-insensitive queries.
*/
public class TableCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableCache.class);
+ private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+ private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+ private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+ private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+ private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+ private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
- private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
- private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private final boolean _caseInsensitive;
+ // For case insensitive, key is lower case table name, value is actual table
name
+ private final Map<String, String> _tableNameMap;
- private ZkHelixPropertyStore<ZNRecord> _propertyStore;
- TableConfigChangeListener _tableConfigChangeListener;
- SchemaChangeListener _schemaChangeListener;
+ // Key is table name with type suffix
+ private final TableConfigChangeListener _tableConfigChangeListener = new
TableConfigChangeListener();
+ private final Map<String, TableConfig> _tableConfigMap = new
ConcurrentHashMap<>();
+ // Key is raw table name
+ private final SchemaChangeListener _schemaChangeListener = new
SchemaChangeListener();
+ private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
- public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
caseInsensitive) {
_propertyStore = propertyStore;
- _schemaChangeListener = new SchemaChangeListener();
- _schemaChangeListener.refresh();
- _tableConfigChangeListener = new TableConfigChangeListener();
- _tableConfigChangeListener.refresh();
+ _caseInsensitive = caseInsensitive;
+ _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+ synchronized (_tableConfigChangeListener) {
+ // Subscribe child changes before reading the data to avoid missing
changes
+ _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH,
_tableConfigChangeListener);
+
+ List<String> tables =
_propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+ if (CollectionUtils.isNotEmpty(tables)) {
+ List<String> pathsToAdd = new ArrayList<>(tables.size());
+ for (String tableNameWithType : tables) {
+ pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+ }
+ addTableConfigs(pathsToAdd);
+ }
+ }
+
+ synchronized (_schemaChangeListener) {
+ // Subscribe child changes before reading the data to avoid missing
changes
+ _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH,
_schemaChangeListener);
+
+ List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH,
AccessOption.PERSISTENT);
+ if (CollectionUtils.isNotEmpty(tables)) {
+ List<String> pathsToAdd = new ArrayList<>(tables.size());
+ for (String rawTableName : tables) {
+ pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+ }
+ addSchemas(pathsToAdd);
+ }
+ }
+
+ LOGGER.info("Initialized TableCache with caseInsensitive: {}",
caseInsensitive);
}
- public String getActualTableName(String tableName) {
- return
_tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(),
tableName);
+ /**
+ * Returns {@code true} if the TableCache is case-insensitive, {@code false}
otherwise.
+ */
+ public boolean isCaseInsensitive() {
+ return _caseInsensitive;
}
- public boolean containsTable(String tableName) {
- return
_tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+ /**
+ * For case-insensitive only, returns the actual table name for the given
case-insensitive table name (with or without
+ * type suffix), or {@code null} if the table does not exist.
+ */
+ @Nullable
+ public String getActualTableName(String caseInsensitiveTableName) {
+ Preconditions.checkState(_caseInsensitive, "TableCache is not
case-insensitive");
+ return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
}
- public String getActualColumnName(String tableName, String columnName) {
- String schemaName =
_tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
- if (schemaName != null) {
- String actualColumnName =
_schemaChangeListener.getColumnName(schemaName, columnName);
- // If actual column name doesn't exist in schema, then return the origin
column name.
- if (actualColumnName == null) {
- return columnName;
+ /**
+ * For case-insensitive only, returns a map from lower case column name to
actual column name for the given table, or
+ * {@code null} if the table schema does not exist.
+ */
+ @Nullable
+ public Map<String, String> getColumnNameMap(String rawTableName) {
+ Preconditions.checkState(_caseInsensitive, "TableCache is not
case-insensitive");
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._columnNameMap : null;
+ }
+
+ /**
+ * Returns the table config for the given table, or {@code null} if it does
not exist.
+ */
+ @Nullable
+ public TableConfig getTableConfig(String tableNameWithType) {
+ return _tableConfigMap.get(tableNameWithType);
+ }
+
+ /**
+ * Returns the schema for the given table, or {@code null} if it does not
exist.
+ */
+ @Nullable
+ public Schema getSchema(String rawTableName) {
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._schema : null;
+ }
+
+ private void addTableConfigs(List<String> paths) {
+ // Subscribe data changes before reading the data to avoid missing changes
+ for (String path : paths) {
+ _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
+ for (ZNRecord znRecord : znRecords) {
+ if (znRecord != null) {
+ try {
+ putTableConfig(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while adding table config for
ZNRecord: {}", znRecord.getId(), e);
+ }
}
- return actualColumnName;
}
- return columnName;
}
- public TableConfig getTableConfig(String tableName) {
- return _tableConfigChangeListener._tableConfigMap.get(tableName);
+ private void putTableConfig(ZNRecord znRecord)
+ throws IOException {
+ TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+ String tableNameWithType = tableConfig.getTableName();
+ _tableConfigMap.put(tableNameWithType, tableConfig);
+ if (_caseInsensitive) {
+ _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+ }
}
- class TableConfigChangeListener implements IZkChildListener, IZkDataListener
{
-
- Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
- Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
- Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
- public synchronized void refresh() {
- try {
- //always subscribe first before reading, so that we dont miss any
changes
-
_propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
_tableConfigChangeListener);
-
_propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
_tableConfigChangeListener);
- List<ZNRecord> children =
- _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX,
null, AccessOption.PERSISTENT);
- if (children != null) {
- for (ZNRecord znRecord : children) {
- try {
- TableConfig tableConfig =
TableConfigUtils.fromZNRecord(znRecord);
- String tableNameWithType = tableConfig.getTableName();
- _tableConfigMap.put(tableNameWithType, tableConfig);
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- //create case insensitive mapping
- _tableNameMap.put(tableNameWithType.toLowerCase(),
tableNameWithType);
- _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
- //create case insensitive mapping between table name and
schemaName
- _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(),
rawTableName);
- _table2SchemaConfigMap.put(rawTableName.toLowerCase(),
rawTableName);
- } catch (Exception e) {
- LOGGER.warn("Exception loading table config for: {}: {}",
znRecord.getId(), e.getMessage());
- //ignore
- }
- }
+ private void removeTableConfig(String path) {
+ _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+ String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+ _tableConfigMap.remove(tableNameWithType);
+ if (_caseInsensitive) {
+ _tableNameMap.remove(tableNameWithType.toLowerCase());
+ String lowerCaseRawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+ _tableNameMap.remove(lowerCaseRawTableName);
+ }
+ } else {
+ if (!_tableNameMap.containsKey(lowerCaseRawTableName +
LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+ _tableNameMap.remove(lowerCaseRawTableName);
}
- } catch (Exception e) {
- LOGGER.warn("Exception subscribing/reading tableconfigs", e);
- //ignore
}
}
+ }
+
+ private void addSchemas(List<String> paths) {
+ // Subscribe data changes before reading the data to avoid missing changes
+ for (String path : paths) {
+ _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
+ for (ZNRecord znRecord : znRecords) {
+ if (znRecord != null) {
+ try {
+ putSchema(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while adding schema for ZNRecord:
{}", znRecord.getId(), e);
+ }
+ }
+ }
+ }
+
+ private void putSchema(ZNRecord znRecord)
+ throws IOException {
+ Schema schema = SchemaUtils.fromZNRecord(znRecord);
+ String rawTableName = schema.getSchemaName();
+ if (_caseInsensitive) {
+ Map<String, String> columnNameMap = new HashMap<>();
+ for (String columnName : schema.getColumnNames()) {
+ columnNameMap.put(columnName.toLowerCase(), columnName);
+ }
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+ } else {
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+ }
+ }
+
+ private void removeSchema(String path) {
+ _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+ String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+ _schemaInfoMap.remove(rawTableName);
+ }
+
+ private class TableConfigChangeListener implements IZkChildListener,
IZkDataListener {
@Override
- public void handleChildChange(String s, List<String> list)
- throws Exception {
- refresh();
+ public synchronized void handleChildChange(String path, List<String>
tables) {
+ if (CollectionUtils.isEmpty(tables)) {
+ return;
+ }
+
+ // Only process new added table configs. Changed/removed table configs
are handled by other callbacks.
+ List<String> pathsToAdd = new ArrayList<>();
+ for (String tableNameWithType : tables) {
+ if (!_tableConfigMap.containsKey(tableNameWithType)) {
+ pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+ }
+ }
+ if (!pathsToAdd.isEmpty()) {
+ addTableConfigs(pathsToAdd);
+ }
}
@Override
- public void handleDataChange(String s, Object o)
- throws Exception {
- refresh();
+ public synchronized void handleDataChange(String path, Object data) {
+ if (data != null) {
+ ZNRecord znRecord = (ZNRecord) data;
+ try {
+ putTableConfig(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing table config for
ZNRecord: {}", znRecord.getId(), e);
+ }
+ }
}
@Override
- public void handleDataDeleted(String s)
- throws Exception {
- refresh();
+ public synchronized void handleDataDeleted(String path) {
+ // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
+ String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
+ removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
}
}
- class SchemaChangeListener implements IZkChildListener, IZkDataListener {
- Map<String, Map<String, String>> _schemaColumnMap = new
ConcurrentHashMap<>();
-
- public synchronized void refresh() {
Review comment:
this method was good, only thing we had to handle was calling subscribe
on each child node. This is a much better way to handle ZK callbacks instead of
trying to handle every callback with custom logic
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]