This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 308b371 In TableCache, invoke callback on config change listener
during registration (#8302)
308b371 is described below
commit 308b3719a5d685e26a01766460abb1fe826cbc59
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Mar 7 13:08:10 2022 -0800
In TableCache, invoke callback on config change listener during
registration (#8302)
Currently when the config change listener is registered, the callback is
not triggered automatically. It relies on the caller to set the current configs
into the change listener, which is hard to use because there might be other
changes invoking the `onChange()` callback at the same time.
This PR changes the listener registration method to directly call the
`onChange()` with the current configs.
---
.../pinot/common/config/provider/TableCache.java | 52 ++++++++----
.../pinot/controller/helix/TableCacheTest.java | 92 +++++++++++++---------
.../spi/config/provider/PinotConfigProvider.java | 26 +++---
3 files changed, 107 insertions(+), 63 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index a7c56f1..040263e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -19,15 +19,14 @@
package org.apache.pinot.common.config.provider;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
@@ -64,8 +63,9 @@ public class TableCache implements PinotConfigProvider {
private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
- private final Set<TableConfigChangeListener> _tableConfigChangeListeners =
ConcurrentHashMap.newKeySet();
- private final Set<SchemaChangeListener> _schemaChangeListeners =
ConcurrentHashMap.newKeySet();
+ // NOTE: No need to use concurrent set because it is always accessed within
the ZK change listener lock
+ private final Set<TableConfigChangeListener> _tableConfigChangeListeners =
new HashSet<>();
+ private final Set<SchemaChangeListener> _schemaChangeListeners = new
HashSet<>();
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final boolean _caseInsensitive;
@@ -158,10 +158,13 @@ public class TableCache implements PinotConfigProvider {
}
@Override
- public List<TableConfig>
registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener) {
+ public boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener) {
synchronized (_zkTableConfigChangeListener) {
- _tableConfigChangeListeners.add(tableConfigChangeListener);
- return Lists.newArrayList(_tableConfigMap.values());
+ boolean added =
_tableConfigChangeListeners.add(tableConfigChangeListener);
+ if (added) {
+ tableConfigChangeListener.onChange(getTableConfigs());
+ }
+ return added;
}
}
@@ -177,10 +180,13 @@ public class TableCache implements PinotConfigProvider {
}
@Override
- public List<Schema> registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
+ public boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
synchronized (_zkSchemaChangeListener) {
- _schemaChangeListeners.add(schemaChangeListener);
- return _schemaInfoMap.values().stream().map(s ->
s._schema).collect(Collectors.toList());
+ boolean added = _schemaChangeListeners.add(schemaChangeListener);
+ if (added) {
+ schemaChangeListener.onChange(getSchemas());
+ }
+ return added;
}
}
@@ -315,15 +321,33 @@ public class TableCache implements PinotConfigProvider {
}
private void notifyTableConfigChangeListeners() {
- for (TableConfigChangeListener tableConfigChangeListener :
_tableConfigChangeListeners) {
-
tableConfigChangeListener.onChange(Lists.newArrayList(_tableConfigMap.values()));
+ if (!_tableConfigChangeListeners.isEmpty()) {
+ List<TableConfig> tableConfigs = getTableConfigs();
+ for (TableConfigChangeListener tableConfigChangeListener :
_tableConfigChangeListeners) {
+ tableConfigChangeListener.onChange(tableConfigs);
+ }
}
}
+ private List<TableConfig> getTableConfigs() {
+ return new ArrayList<>(_tableConfigMap.values());
+ }
+
private void notifySchemaChangeListeners() {
- for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners) {
- schemaChangeListener.onChange(_schemaInfoMap.values().stream().map(s ->
s._schema).collect(Collectors.toList()));
+ if (!_schemaChangeListeners.isEmpty()) {
+ List<Schema> schemas = getSchemas();
+ for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners)
{
+ schemaChangeListener.onChange(schemas);
+ }
+ }
+ }
+
+ private List<Schema> getSchemas() {
+ List<Schema> schemas = new ArrayList<>(_schemaInfoMap.size());
+ for (SchemaInfo schemaInfo : _schemaInfoMap.values()) {
+ schemas.add(schemaInfo._schema);
}
+ return schemas;
}
private class ZkTableConfigChangeListener implements IZkChildListener,
IZkDataListener {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index ace6417..874d993 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.helix;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.controller.ControllerTestUtils;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
@@ -35,13 +34,11 @@ import
org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.*;
public class TableCacheTest {
@@ -96,11 +93,6 @@ public class TableCacheTest {
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
// Case-insensitive table name are handled based on the table config
instead of the schema
assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
- TestSchemaChangeListener schemaChangeListener = new
TestSchemaChangeListener();
- List<Schema> schemas =
tableCache.registerSchemaChangeListener(schemaChangeListener);
- Assert.assertNotNull(schemas);
- Assert.assertEquals(schemas.size(), 1);
- Assert.assertEquals(schemas.get(0).getSchemaName(), SCHEMA_NAME);
// Add a table config
TableConfig tableConfig =
@@ -118,78 +110,100 @@ public class TableCacheTest {
assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
+
+ // Register the change listeners
TestTableConfigChangeListener tableConfigChangeListener = new
TestTableConfigChangeListener();
- List<TableConfig> tableConfigs =
tableCache.registerTableConfigChangeListener(tableConfigChangeListener);
- Assert.assertNotNull(tableConfigs);
- Assert.assertEquals(tableConfigs.size(), 1);
- Assert.assertEquals(tableConfigs.get(0).getTableName(),
OFFLINE_TABLE_NAME);
+
assertTrue(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
+ assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
+ assertEquals(tableConfigChangeListener._tableConfigList.get(0),
tableConfig);
+ TestSchemaChangeListener schemaChangeListener = new
TestSchemaChangeListener();
+ assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
+ assertEquals(schemaChangeListener._schemaList.size(), 1);
+ assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+ // Re-register the change listener should fail
+
assertFalse(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
+ assertFalse(tableCache.registerSchemaChangeListener(schemaChangeListener));
// Update the schema
schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
// Wait for at most 10 seconds for the callback to update the schema in
the cache
- // NOTE: schema should never be null during the transitioning
+ // NOTE:
+ // - Schema should never be null during the transitioning
+ // - Schema change listener callback should always contain 1 schema
+ // - Verify if the callback is fully done by checking the schema change
lister because it is the last step of the
+ // callback handling
expectedSchema.addField(new DimensionFieldSpec("newColumn", DataType.LONG,
true));
- TestUtils.waitForCondition(
- aVoid ->
Objects.requireNonNull(tableCache.getSchema(SCHEMA_NAME)).equals(expectedSchema),
10_000L,
- "Failed to update the schema in the cache");
- // Schema can be accessed by both the schema name and the raw table name
expectedColumnMap.put("newcolumn", "newColumn");
+ TestUtils.waitForCondition(aVoid -> {
+ assertNotNull(tableCache.getSchema(SCHEMA_NAME));
+ assertEquals(schemaChangeListener._schemaList.size(), 1);
+ return schemaChangeListener._schemaList.get(0).equals(expectedSchema);
+ }, 10_000L, "Failed to update the schema in the cache");
+ // Schema can be accessed by both the schema name and the raw table name
+ assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
- Assert.assertNotNull(schemaChangeListener._schemaList);
- Assert.assertEquals(schemaChangeListener._schemaList.size(), 1);
-
Assert.assertEquals(schemaChangeListener._schemaList.get(0).getSchemaName(),
SCHEMA_NAME);
// Update the table config and drop the schema name
tableConfig.getValidationConfig().setSchemaName(null);
ControllerTestUtils.getHelixResourceManager().updateTableConfig(tableConfig);
// Wait for at most 10 seconds for the callback to update the table config
in the cache
+ // NOTE:
+ // - Table config should never be null during the transitioning
+ // - Table config change listener callback should always contain 1 table
config
+ // - Verify if the callback is fully done by checking the table config
change lister because it is the last step of
+ // the callback handling
+ TestUtils.waitForCondition(aVoid -> {
+ assertNotNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+ assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
+ return
tableConfigChangeListener._tableConfigList.get(0).equals(tableConfig);
+ }, 10_000L, "Failed to update the table config in the cache");
// After dropping the schema name from the table config, schema can only
be accessed by the schema name, but not by
// the table name
- // NOTE: Table config should never be null during the transitioning
- TestUtils.waitForCondition(
- aVoid ->
Objects.requireNonNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)).equals(tableConfig)
- && tableCache.getSchema(RAW_TABLE_NAME) == null &&
tableCache.getColumnNameMap(RAW_TABLE_NAME) == null,
- 10_000L, "Failed to update the table config in the cache");
+ assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
+ assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME),
RAW_TABLE_NAME);
assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME),
OFFLINE_TABLE_NAME);
assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
- Assert.assertNotNull(tableConfigChangeListener._tableConfigList);
- Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
-
Assert.assertEquals(tableConfigChangeListener._tableConfigList.get(0).getTableName(),
OFFLINE_TABLE_NAME);
// Remove the table config
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
// Wait for at most 10 seconds for the callback to remove the table config
from the cache
- TestUtils.waitForCondition(aVoid ->
tableCache.getTableConfig(OFFLINE_TABLE_NAME) == null
- && tableCache.getActualTableName(RAW_TABLE_NAME) == null, 10_000L,
+ // NOTE:
+ // - Verify if the callback is fully done by checking the table config
change lister because it is the last step of
+ // the callback handling
+ TestUtils.waitForCondition(aVoid ->
tableConfigChangeListener._tableConfigList.isEmpty(), 10_000L,
"Failed to remove the table config from the cache");
+ assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+ assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
assertNull(tableCache.getSchema(RAW_TABLE_NAME));
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
- Assert.assertNotNull(schemaChangeListener._schemaList);
- Assert.assertEquals(schemaChangeListener._schemaList.size(), 1);
- Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
// Remove the schema
ControllerTestUtils.getHelixResourceManager().deleteSchema(schema);
// Wait for at most 10 seconds for the callback to remove the schema from
the cache
- TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) ==
null, 10_000L,
+ // NOTE:
+ // - Verify if the callback is fully done by checking the schema change
lister because it is the last step of the
+ // callback handling
+ TestUtils.waitForCondition(aVoid ->
schemaChangeListener._schemaList.isEmpty(), 10_000L,
"Failed to remove the schema from the cache");
+ assertNull(tableCache.getSchema(SCHEMA_NAME));
assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
assertNull(tableCache.getSchema(RAW_TABLE_NAME));
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
- Assert.assertEquals(schemaChangeListener._schemaList.size(), 0);
- Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
+ assertEquals(schemaChangeListener._schemaList.size(), 0);
+ assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
}
private static class TestTableConfigChangeListener implements
TableConfigChangeListener {
- private List<TableConfig> _tableConfigList;
+ private volatile List<TableConfig> _tableConfigList;
@Override
public void onChange(List<TableConfig> tableConfigList) {
@@ -198,7 +212,7 @@ public class TableCacheTest {
}
private static class TestSchemaChangeListener implements
SchemaChangeListener {
- private List<Schema> _schemaList;
+ private volatile List<Schema> _schemaList;
@Override
public void onChange(List<Schema> schemaList) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
index abf3573..0400fe0 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
@@ -24,31 +24,37 @@ import org.apache.pinot.spi.data.Schema;
/**
- * An interface for the provider of pinot table configs and schema
+ * An interface for the provider of pinot table configs and schemas.
*/
public interface PinotConfigProvider {
/**
- * Gets the tableConfig
+ * Returns the table config for the given table name with type suffix.
*/
TableConfig getTableConfig(String tableNameWithType);
/**
* Registers the {@link TableConfigChangeListener} and notifies it whenever
any changes (addition, update, removal)
- * to any of the table configs are detected
- * @return current list of tables
+ * to any of the table configs are detected. If the listener is successfully
registered,
+ * {@link TableConfigChangeListener#onChange(List)} will be invoked with the
current table configs.
+ *
+ * @return {@code true} if the listener is successfully registered, {@code
false} if the listener is already
+ * registered.
*/
- List<TableConfig>
registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener);
+ boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener);
/**
- * Gets the schema
+ * Returns the schema for the given raw table name.
*/
Schema getSchema(String rawTableName);
/**
- * Registers the {@link SchemaChangeListener} and notifies it whenever any
changes (addition, update, removal) to
- * schemas are detected
- * @return current list of schemas
+ * Registers the {@link SchemaChangeListener} and notifies it whenever any
changes (addition, update, removal) to any
+ * of the schemas are detected. If the listener is successfully registered,
+ * {@link SchemaChangeListener#onChange(List)} will be invoked with the
current schemas.
+ *
+ * @return {@code true} if the listener is successfully registered, {@code
false} if the listener is already
+ * registered.
*/
- List<Schema> registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener);
+ boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]