nastra commented on code in PR #6698: URL: https://github.com/apache/iceberg/pull/6698#discussion_r1094817202
########## core/src/main/java/org/apache/iceberg/CatalogUtil.java: ########## @@ -439,4 +439,44 @@ public static MetricsReporter loadMetricsReporter(String impl) { return reporter; } + + /** + * Load a custom ClientPool implementation. + * + * <p>The ClientPool must have a no-arg constructor. If the class implements Configurable, a + * Hadoop config will be passed using Configurable.setConf. {@link ClientPool#initialize(Map)} is + * called to complete the initialization. + * + * @param impl ClientPool implementation full class name + * @param properties catalog properties + * @param conf hadoop configuration if needed + * @return initialized ClientPool object + * @throws IllegalArgumentException if no-arg constructor not found or error during initialization + */ + public static <C, E extends Exception> ClientPool<C, E> loadClientPool( + String impl, Map<String, String> properties, Object conf) { + LOG.info("Loading custom client pool implementation: {}", impl); + Preconditions.checkNotNull( Review Comment: nit: I think it would be good to be slightly more precise here and change this to `Preconditions.checkArgument(null != impl, ...)` ########## hive-metastore/src/test/java/org/apache/iceberg/hive/TestLoadHiveCatalog.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.thrift.TException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestLoadHiveCatalog { + + private static TestHiveMetastore metastore; + + @BeforeClass + public static void startMetastore() throws Exception { + HiveConf hiveConf = new HiveConf(TestLoadHiveCatalog.class); + metastore = new TestHiveMetastore(); + metastore.start(hiveConf); + } + + @AfterClass + public static void stopMetastore() throws Exception { + if (metastore != null) { + metastore.stop(); + metastore = null; + } + } + + @Test + public void testCustomClientPool() throws Exception { + Map<String, String> properties = + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_IMPL, CachedClientPoolWrapper.class.getName()); + HiveCatalog hiveCatalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + properties, + metastore.hiveConf()); + Assert.assertTrue(hiveCatalog.clientPool() instanceof CachedClientPoolWrapper); Review Comment: ```suggestion Assertions.assertThat(hiveCatalog.clientPool()).isInstanceOf(CachedClientPoolWrapper.class); ``` ########## hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java: ########## @@ -31,11 +38,106 @@ public void testClientPoolCleaner() throws InterruptedException { String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap()); HiveClientPool clientPool1 = clientPool.clientPool(); - Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1); + Supplier<Object> uri = () -> CachedClientPool.URIElement.of(metastoreUri); + Assert.assertTrue( + CachedClientPool.clientPoolCache() + .getIfPresent(CachedClientPool.toKey(Collections.singletonList(uri))) + == clientPool1); TimeUnit.MILLISECONDS.sleep(EVICTION_INTERVAL - TimeUnit.SECONDS.toMillis(2)); Review Comment: this test methods takes **23+ seconds**, which is too long imo for a simple unit test. We might want to decrease the eviction interval for testing (can be a separate PR) ########## hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java: ########## @@ -31,11 +38,106 @@ public void testClientPoolCleaner() throws InterruptedException { String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap()); HiveClientPool clientPool1 = clientPool.clientPool(); - Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1); + Supplier<Object> uri = () -> CachedClientPool.URIElement.of(metastoreUri); + Assert.assertTrue( Review Comment: this should rather check that they are the same instead of using `assertTrue`: ``` Assertions.assertThat( CachedClientPool.clientPoolCache() .getIfPresent(CachedClientPool.toKey(Collections.singletonList(uri)))) .isSameAs(clientPool1); ``` Same below ########## hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java: ########## @@ -31,11 +38,106 @@ public void testClientPoolCleaner() throws InterruptedException { String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap()); HiveClientPool clientPool1 = clientPool.clientPool(); - Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1); + Supplier<Object> uri = () -> CachedClientPool.URIElement.of(metastoreUri); + Assert.assertTrue( + CachedClientPool.clientPoolCache() + .getIfPresent(CachedClientPool.toKey(Collections.singletonList(uri))) + == clientPool1); TimeUnit.MILLISECONDS.sleep(EVICTION_INTERVAL - TimeUnit.SECONDS.toMillis(2)); HiveClientPool clientPool2 = clientPool.clientPool(); Assert.assertTrue(clientPool1 == clientPool2); TimeUnit.MILLISECONDS.sleep(EVICTION_INTERVAL + TimeUnit.SECONDS.toMillis(5)); - Assert.assertNull(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri)); + Assert.assertNull( + CachedClientPool.clientPoolCache() + .getIfPresent(CachedClientPool.toKey(Collections.singletonList(uri)))); + } + + @Test + public void testCacheKey() throws Exception { + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + UserGroupInformation foo1 = UserGroupInformation.createProxyUser("foo", current); + UserGroupInformation foo2 = UserGroupInformation.createProxyUser("foo", current); + UserGroupInformation bar = UserGroupInformation.createProxyUser("bar", current); + + Key key1 = + foo1.doAs( + (PrivilegedAction<Key>) + () -> + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("Uri, user_name", hiveConf))); + Key key2 = + foo2.doAs( + (PrivilegedAction<Key>) + () -> + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("user_name,uri", hiveConf))); + Assert.assertEquals("Key elements order shouldn't matter", key1, key2); + + key1 = + foo1.doAs( + (PrivilegedAction<Key>) + () -> + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("uri,ugi", hiveConf))); + key2 = + bar.doAs( + (PrivilegedAction<Key>) + () -> + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("uri,ugi", hiveConf))); + Assert.assertNotEquals("Different users are not supposed to be equivalent", key1, key2); + + key2 = + foo2.doAs( + (PrivilegedAction<Key>) + () -> + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("uri,ugi", hiveConf))); + Assert.assertNotEquals("Different UGI instances are not supposed to be equivalent", key1, key2); + + key1 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri", hiveConf)); + key2 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,ugi", hiveConf)); + Assert.assertNotEquals( + "Keys with different number of elements are not supposed to be equivalent", key1, key2); + + Configuration conf1 = new Configuration(hiveConf); + Configuration conf2 = new Configuration(hiveConf); + + conf1.set("key1", "val"); + key1 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,conf:key1", conf1)); + key2 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,conf:key1", conf2)); + Assert.assertNotEquals( + "Config with different values are not supposed to be equivalent", key1, key2); + + conf2.set("key1", "val"); + conf2.set("key2", "val"); + key2 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,conf:key2", conf2)); + Assert.assertNotEquals( + "Config with different keys are not supposed to be equivalent", key1, key2); + + key1 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,conf:key1,ugi", conf1)); + key2 = CachedClientPool.toKey(CachedClientPool.extractKeySuppliers("uri,ugi,conf:key1", conf2)); + Assert.assertEquals("Config with same key/value should be equivalent", key1, key2); + + conf1.set("key2", "val"); + key1 = + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("uri,conf:key2 ,conf:key1", conf1)); + key2 = + CachedClientPool.toKey( + CachedClientPool.extractKeySuppliers("conf:key2,conf:key1,uri", conf2)); + Assert.assertEquals("Config with same key/value should be equivalent", key1, key2); + + AssertHelpers.assertThrows( + "Duplicate key elements should result in an error", + ValidationException.class, + "URI key element already specified", + () -> CachedClientPool.extractKeySuppliers("uri,uri,ugi", hiveConf)); + + AssertHelpers.assertThrows( Review Comment: we're trying to move people to using AssertJ directly for such things (see also https://github.com/apache/iceberg/pull/6977), so might be good to do that here as well ########## core/src/main/java/org/apache/iceberg/CatalogUtil.java: ########## @@ -439,4 +439,44 @@ public static MetricsReporter loadMetricsReporter(String impl) { return reporter; } + + /** + * Load a custom ClientPool implementation. + * + * <p>The ClientPool must have a no-arg constructor. If the class implements Configurable, a + * Hadoop config will be passed using Configurable.setConf. {@link ClientPool#initialize(Map)} is + * called to complete the initialization. + * + * @param impl ClientPool implementation full class name + * @param properties catalog properties + * @param conf hadoop configuration if needed + * @return initialized ClientPool object + * @throws IllegalArgumentException if no-arg constructor not found or error during initialization + */ + public static <C, E extends Exception> ClientPool<C, E> loadClientPool( Review Comment: tests for this should go into `TestCatalogUtil` ########## hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java: ########## @@ -87,4 +106,125 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry) throws TException, InterruptedException { return clientPool().run(action, retry); } + + @VisibleForTesting + static Key toKey(List<Supplier<Object>> suppliers) { + return Key.of(suppliers.stream().map(Supplier::get).collect(Collectors.toList())); + } + + @VisibleForTesting + static List<Supplier<Object>> extractKeySuppliers(String cacheKeys, Configuration conf) { + URIElement uri = URIElement.of(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "")); + if (cacheKeys == null || cacheKeys.isEmpty()) { + return Collections.singletonList(() -> uri); + } + + // generate key elements in a certain order, so that the Key instances are comparable + Set<KeyElementType> types = Sets.newTreeSet(Comparator.comparingInt(Enum::ordinal)); + Map<String, String> confElements = Maps.newTreeMap(); + for (String element : cacheKeys.split(",", -1)) { + String trimmed = element.trim(); + if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) { + String key = trimmed.substring(CONF_ELEMENT_PREFIX.length()); + ValidationException.check( + !confElements.containsKey(key), "Conf key element %s already specified", key); + confElements.put(key, conf.get(key)); + } else { + KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase()); + switch (type) { + case URI: + case UGI: + case USER_NAME: + ValidationException.check( + types.add(type), "%s key element already specified", type.name()); + break; + default: + throw new ValidationException("Unknown key element %s", trimmed); + } + } + } + ImmutableList.Builder<Supplier<Object>> suppliers = ImmutableList.builder(); + for (KeyElementType type : types) { + switch (type) { + case URI: + suppliers.add(() -> uri); + break; + case UGI: + suppliers.add( + () -> { + try { + return UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + break; + case USER_NAME: + suppliers.add( + () -> { + try { + String userName = UserGroupInformation.getCurrentUser().getUserName(); + return UserNameElement.of(userName); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + break; + default: + throw new RuntimeException("Unexpected key element " + type.name()); + } + } + for (String key : confElements.keySet()) { + ConfElement element = ConfElement.of(key, confElements.get(key)); + suppliers.add(() -> element); + } + return suppliers.build(); + } + + @Value.Immutable Review Comment: I agree that it's a bit weird to have multiple wrapper classes for holding a single string or a single list. I was actually able to rewrite the code without using any of those wrapper classes and the tests passed. ########## hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java: ########## @@ -87,4 +106,125 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry) throws TException, InterruptedException { return clientPool().run(action, retry); } + + @VisibleForTesting + static Key toKey(List<Supplier<Object>> suppliers) { + return Key.of(suppliers.stream().map(Supplier::get).collect(Collectors.toList())); + } + + @VisibleForTesting + static List<Supplier<Object>> extractKeySuppliers(String cacheKeys, Configuration conf) { Review Comment: TBH I find the code difficult to reason about, especially given the fact that the Cache key is now essentially a `List<Object>`. I was wondering whether it would be possible to build up a String that includes all of the relevant items in string form. Something like `uri:<...>_ugi:<...>_username:<...>_conf:<...>` but you'd probably need to use delimiters that are unique (and also I don't know if a string representation of `UserGroupInformation` would be unique) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org