This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.1.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1d7b63102ebc8974e8133c964917ea8052148088 Author: hexianqing <[email protected]> AuthorDate: Mon Sep 26 19:18:24 2022 +0800 IMPALA-11558: Ensure one Kudu client created (FE) for the specified Kudu master addresses Creating Kudu clients is very expensive as each will fetch metadata from the Kudu master, so we have to ensure only one Kudu client created for a given Kudu master address. The solution is to ensure KuduUtil.getKuduClient created only one KuduClient for the specified Kudu master addresses by using 'computeIfAbsent' of the ConcurrentHashMap. Testing: - Manually ran a stress test: scan of a Kudu table, 1000 concurrent queries and verified the untracked memory - Added concurrent tests for KuduUtil.getKuduClient - Ran the full set of verifications in Impala Public Jenkins Change-Id: I1003556d3afc8e8216142cac4007a4c99046caeb Reviewed-on: http://gerrit.cloudera.org:8080/19046 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/19129 Reviewed-by: Wenzhe Zhou <[email protected]> --- .../main/java/org/apache/impala/util/KuduUtil.java | 13 ++-- .../java/org/apache/impala/util/KuduUtilTest.java | 80 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 7037c921a..dcaa4da09 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -87,16 +87,14 @@ public class KuduUtil { * fetching tablet metadata. */ public static KuduClient getKuduClient(String kuduMasters) { - KuduClient client = kuduClients_.get(kuduMasters); - if (client == null) { + KuduClient client = kuduClients_.computeIfAbsent(kuduMasters, k -> { KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName()); - client = b.build(); - kuduClients_.put(kuduMasters, client); - } + return b.build(); + }); return client; } @@ -479,4 +477,9 @@ public class KuduUtil { kuduPartitionExpr.analyze(analyzer); return kuduPartitionExpr; } + + // Used for test assertions + public static int getkuduClientsSize() { + return kuduClients_.size(); + } } diff --git a/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java new file mode 100644 index 000000000..192d0db3b --- /dev/null +++ b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; +import static org.junit.Assert.*; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.List; + +import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.TBackendGflags; +import org.apache.kudu.client.KuduClient; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Unit tests for KuduUtil functions. + */ +public class KuduUtilTest { + private static TBackendGflags origFlags; + + @BeforeClass + public static void setup() { + // The original BackendConfig need to be mocked, we are saving the values here, so + // they can be restored and not break other tests + if (BackendConfig.INSTANCE == null) { + BackendConfig.create(new TBackendGflags()); + } + origFlags = BackendConfig.INSTANCE.getBackendCfg(); + } + + @AfterClass + public static void teardown() { + BackendConfig.create(origFlags); + } + + @Test + public void testGetKuduClient() { + int size = KuduUtil.getkuduClientsSize(); + int concurrent = 5; + CountDownLatch latch = new CountDownLatch(concurrent); + List<KuduClient> clients = new CopyOnWriteArrayList<>(); + for (int i = 0; i < concurrent; i++) { + new Thread() { + public void run() { + KuduClient client = KuduUtil.getKuduClient("master0"); + clients.add(client); + latch.countDown(); + } + }.start(); + } + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for (int i = 1; i < concurrent; i++) { + assertSame(clients.get(0), clients.get(i)); + } + KuduUtil.getKuduClient("master1"); + assertEquals(size + 2, KuduUtil.getkuduClientsSize()); + } +} \ No newline at end of file
