This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 308b371  In TableCache, invoke callback on config change listener 
during registration (#8302)
308b371 is described below

commit 308b3719a5d685e26a01766460abb1fe826cbc59
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Mar 7 13:08:10 2022 -0800

    In TableCache, invoke callback on config change listener during 
registration (#8302)
    
    Currently when the config change listener is registered, the callback is 
not triggered automatically. It relies on the caller to set the current configs 
into the change listener, which is hard to use because there might be other 
changes invoking the `onChange()` callback at the same time.
    This PR changes the listener registration method to directly call the 
`onChange()` with the current configs.
---
 .../pinot/common/config/provider/TableCache.java   | 52 ++++++++----
 .../pinot/controller/helix/TableCacheTest.java     | 92 +++++++++++++---------
 .../spi/config/provider/PinotConfigProvider.java   | 26 +++---
 3 files changed, 107 insertions(+), 63 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index a7c56f1..040263e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -19,15 +19,14 @@
 package org.apache.pinot.common.config.provider;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -64,8 +63,9 @@ public class TableCache implements PinotConfigProvider {
   private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
   private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private final Set<TableConfigChangeListener> _tableConfigChangeListeners = 
ConcurrentHashMap.newKeySet();
-  private final Set<SchemaChangeListener> _schemaChangeListeners = 
ConcurrentHashMap.newKeySet();
+  // NOTE: No need to use concurrent set because it is always accessed within 
the ZK change listener lock
+  private final Set<TableConfigChangeListener> _tableConfigChangeListeners = 
new HashSet<>();
+  private final Set<SchemaChangeListener> _schemaChangeListeners = new 
HashSet<>();
 
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final boolean _caseInsensitive;
@@ -158,10 +158,13 @@ public class TableCache implements PinotConfigProvider {
   }
 
   @Override
-  public List<TableConfig> 
registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener) {
+  public boolean registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener) {
     synchronized (_zkTableConfigChangeListener) {
-      _tableConfigChangeListeners.add(tableConfigChangeListener);
-      return Lists.newArrayList(_tableConfigMap.values());
+      boolean added = 
_tableConfigChangeListeners.add(tableConfigChangeListener);
+      if (added) {
+        tableConfigChangeListener.onChange(getTableConfigs());
+      }
+      return added;
     }
   }
 
@@ -177,10 +180,13 @@ public class TableCache implements PinotConfigProvider {
   }
 
   @Override
-  public List<Schema> registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener) {
+  public boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener) {
     synchronized (_zkSchemaChangeListener) {
-      _schemaChangeListeners.add(schemaChangeListener);
-      return _schemaInfoMap.values().stream().map(s -> 
s._schema).collect(Collectors.toList());
+      boolean added = _schemaChangeListeners.add(schemaChangeListener);
+      if (added) {
+        schemaChangeListener.onChange(getSchemas());
+      }
+      return added;
     }
   }
 
@@ -315,15 +321,33 @@ public class TableCache implements PinotConfigProvider {
   }
 
   private void notifyTableConfigChangeListeners() {
-    for (TableConfigChangeListener tableConfigChangeListener : 
_tableConfigChangeListeners) {
-      
tableConfigChangeListener.onChange(Lists.newArrayList(_tableConfigMap.values()));
+    if (!_tableConfigChangeListeners.isEmpty()) {
+      List<TableConfig> tableConfigs = getTableConfigs();
+      for (TableConfigChangeListener tableConfigChangeListener : 
_tableConfigChangeListeners) {
+        tableConfigChangeListener.onChange(tableConfigs);
+      }
     }
   }
 
+  private List<TableConfig> getTableConfigs() {
+    return new ArrayList<>(_tableConfigMap.values());
+  }
+
   private void notifySchemaChangeListeners() {
-    for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners) {
-      schemaChangeListener.onChange(_schemaInfoMap.values().stream().map(s -> 
s._schema).collect(Collectors.toList()));
+    if (!_schemaChangeListeners.isEmpty()) {
+      List<Schema> schemas = getSchemas();
+      for (SchemaChangeListener schemaChangeListener : _schemaChangeListeners) 
{
+        schemaChangeListener.onChange(schemas);
+      }
+    }
+  }
+
+  private List<Schema> getSchemas() {
+    List<Schema> schemas = new ArrayList<>(_schemaInfoMap.size());
+    for (SchemaInfo schemaInfo : _schemaInfoMap.values()) {
+      schemas.add(schemaInfo._schema);
     }
+    return schemas;
   }
 
   private class ZkTableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index ace6417..874d993 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.helix;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.controller.ControllerTestUtils;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
@@ -35,13 +34,11 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.*;
 
 
 public class TableCacheTest {
@@ -96,11 +93,6 @@ public class TableCacheTest {
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
     // Case-insensitive table name are handled based on the table config 
instead of the schema
     assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
-    TestSchemaChangeListener schemaChangeListener = new 
TestSchemaChangeListener();
-    List<Schema> schemas = 
tableCache.registerSchemaChangeListener(schemaChangeListener);
-    Assert.assertNotNull(schemas);
-    Assert.assertEquals(schemas.size(), 1);
-    Assert.assertEquals(schemas.get(0).getSchemaName(), SCHEMA_NAME);
 
     // Add a table config
     TableConfig tableConfig =
@@ -118,78 +110,100 @@ public class TableCacheTest {
     assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
+
+    // Register the change listeners
     TestTableConfigChangeListener tableConfigChangeListener = new 
TestTableConfigChangeListener();
-    List<TableConfig> tableConfigs = 
tableCache.registerTableConfigChangeListener(tableConfigChangeListener);
-    Assert.assertNotNull(tableConfigs);
-    Assert.assertEquals(tableConfigs.size(), 1);
-    Assert.assertEquals(tableConfigs.get(0).getTableName(), 
OFFLINE_TABLE_NAME);
+    
assertTrue(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
+    assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
+    assertEquals(tableConfigChangeListener._tableConfigList.get(0), 
tableConfig);
+    TestSchemaChangeListener schemaChangeListener = new 
TestSchemaChangeListener();
+    assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
+    assertEquals(schemaChangeListener._schemaList.size(), 1);
+    assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+    // Re-register the change listener should fail
+    
assertFalse(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
+    assertFalse(tableCache.registerSchemaChangeListener(schemaChangeListener));
 
     // Update the schema
     schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
     ControllerTestUtils.getHelixResourceManager().updateSchema(schema, false);
     // Wait for at most 10 seconds for the callback to update the schema in 
the cache
-    // NOTE: schema should never be null during the transitioning
+    // NOTE:
+    // - Schema should never be null during the transitioning
+    // - Schema change listener callback should always contain 1 schema
+    // - Verify if the callback is fully done by checking the schema change 
lister because it is the last step of the
+    //   callback handling
     expectedSchema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, 
true));
-    TestUtils.waitForCondition(
-        aVoid -> 
Objects.requireNonNull(tableCache.getSchema(SCHEMA_NAME)).equals(expectedSchema),
 10_000L,
-        "Failed to update the schema in the cache");
-    // Schema can be accessed by both the schema name and the raw table name
     expectedColumnMap.put("newcolumn", "newColumn");
+    TestUtils.waitForCondition(aVoid -> {
+      assertNotNull(tableCache.getSchema(SCHEMA_NAME));
+      assertEquals(schemaChangeListener._schemaList.size(), 1);
+      return schemaChangeListener._schemaList.get(0).equals(expectedSchema);
+    }, 10_000L, "Failed to update the schema in the cache");
+    // Schema can be accessed by both the schema name and the raw table name
+    assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
-    Assert.assertNotNull(schemaChangeListener._schemaList);
-    Assert.assertEquals(schemaChangeListener._schemaList.size(), 1);
-    
Assert.assertEquals(schemaChangeListener._schemaList.get(0).getSchemaName(), 
SCHEMA_NAME);
 
     // Update the table config and drop the schema name
     tableConfig.getValidationConfig().setSchemaName(null);
     
ControllerTestUtils.getHelixResourceManager().updateTableConfig(tableConfig);
     // Wait for at most 10 seconds for the callback to update the table config 
in the cache
+    // NOTE:
+    // - Table config should never be null during the transitioning
+    // - Table config change listener callback should always contain 1 table 
config
+    // - Verify if the callback is fully done by checking the table config 
change lister because it is the last step of
+    //   the callback handling
+    TestUtils.waitForCondition(aVoid -> {
+      assertNotNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+      assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
+      return 
tableConfigChangeListener._tableConfigList.get(0).equals(tableConfig);
+    }, 10_000L, "Failed to update the table config in the cache");
     // After dropping the schema name from the table config, schema can only 
be accessed by the schema name, but not by
     // the table name
-    // NOTE: Table config should never be null during the transitioning
-    TestUtils.waitForCondition(
-        aVoid -> 
Objects.requireNonNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME)).equals(tableConfig)
-            && tableCache.getSchema(RAW_TABLE_NAME) == null && 
tableCache.getColumnNameMap(RAW_TABLE_NAME) == null,
-        10_000L, "Failed to update the table config in the cache");
+    assertEquals(tableCache.getTableConfig(OFFLINE_TABLE_NAME), tableConfig);
+    assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
     assertEquals(tableCache.getActualTableName(MANGLED_RAW_TABLE_NAME), 
RAW_TABLE_NAME);
     assertEquals(tableCache.getActualTableName(MANGLED_OFFLINE_TABLE_NAME), 
OFFLINE_TABLE_NAME);
     assertNull(tableCache.getActualTableName(REALTIME_TABLE_NAME));
     assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
-    Assert.assertNotNull(tableConfigChangeListener._tableConfigList);
-    Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
-    
Assert.assertEquals(tableConfigChangeListener._tableConfigList.get(0).getTableName(),
 OFFLINE_TABLE_NAME);
 
     // Remove the table config
     
ControllerTestUtils.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
     // Wait for at most 10 seconds for the callback to remove the table config 
from the cache
-    TestUtils.waitForCondition(aVoid -> 
tableCache.getTableConfig(OFFLINE_TABLE_NAME) == null
-            && tableCache.getActualTableName(RAW_TABLE_NAME) == null, 10_000L,
+    // NOTE:
+    // - Verify if the callback is fully done by checking the table config 
change lister because it is the last step of
+    //   the callback handling
+    TestUtils.waitForCondition(aVoid -> 
tableConfigChangeListener._tableConfigList.isEmpty(), 10_000L,
         "Failed to remove the table config from the cache");
+    assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+    assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
     assertEquals(tableCache.getSchema(SCHEMA_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(SCHEMA_NAME), expectedColumnMap);
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
-    Assert.assertNotNull(schemaChangeListener._schemaList);
-    Assert.assertEquals(schemaChangeListener._schemaList.size(), 1);
-    Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
 
     // Remove the schema
     ControllerTestUtils.getHelixResourceManager().deleteSchema(schema);
     // Wait for at most 10 seconds for the callback to remove the schema from 
the cache
-    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) == 
null, 10_000L,
+    // NOTE:
+    // - Verify if the callback is fully done by checking the schema change 
lister because it is the last step of the
+    //   callback handling
+    TestUtils.waitForCondition(aVoid -> 
schemaChangeListener._schemaList.isEmpty(), 10_000L,
         "Failed to remove the schema from the cache");
+    assertNull(tableCache.getSchema(SCHEMA_NAME));
     assertNull(tableCache.getColumnNameMap(SCHEMA_NAME));
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
-    Assert.assertEquals(schemaChangeListener._schemaList.size(), 0);
-    Assert.assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
+    assertEquals(schemaChangeListener._schemaList.size(), 0);
+    assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
   }
 
   private static class TestTableConfigChangeListener implements 
TableConfigChangeListener {
-    private List<TableConfig> _tableConfigList;
+    private volatile List<TableConfig> _tableConfigList;
 
     @Override
     public void onChange(List<TableConfig> tableConfigList) {
@@ -198,7 +212,7 @@ public class TableCacheTest {
   }
 
   private static class TestSchemaChangeListener implements 
SchemaChangeListener {
-    private List<Schema> _schemaList;
+    private volatile List<Schema> _schemaList;
 
     @Override
     public void onChange(List<Schema> schemaList) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
index abf3573..0400fe0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
@@ -24,31 +24,37 @@ import org.apache.pinot.spi.data.Schema;
 
 
 /**
- * An interface for the provider of pinot table configs and schema
+ * An interface for the provider of pinot table configs and schemas.
  */
 public interface PinotConfigProvider {
 
   /**
-   * Gets the tableConfig
+   * Returns the table config for the given table name with type suffix.
    */
   TableConfig getTableConfig(String tableNameWithType);
 
   /**
    * Registers the {@link TableConfigChangeListener} and notifies it whenever 
any changes (addition, update, removal)
-   * to any of the table configs are detected
-   * @return current list of tables
+   * to any of the table configs are detected. If the listener is successfully 
registered,
+   * {@link TableConfigChangeListener#onChange(List)} will be invoked with the 
current table configs.
+   *
+   * @return {@code true} if the listener is successfully registered, {@code 
false} if the listener is already
+   *         registered.
    */
-  List<TableConfig> 
registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener);
+  boolean registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener);
 
   /**
-   * Gets the schema
+   * Returns the schema for the given raw table name.
    */
   Schema getSchema(String rawTableName);
 
   /**
-   * Registers the {@link SchemaChangeListener} and notifies it whenever any 
changes  (addition, update, removal) to
-   * schemas are detected
-   * @return current list of schemas
+   * Registers the {@link SchemaChangeListener} and notifies it whenever any 
changes (addition, update, removal) to any
+   * of the schemas are detected. If the listener is successfully registered,
+   * {@link SchemaChangeListener#onChange(List)} will be invoked with the 
current schemas.
+   *
+   * @return {@code true} if the listener is successfully registered, {@code 
false} if the listener is already
+   *         registered.
    */
-  List<Schema> registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener);
+  boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener);
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to