thrift metastore client
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/84955eb1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/84955eb1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/84955eb1 Branch: refs/heads/griffin-0.2.0-incubating-rc4 Commit: 84955eb129604cc69aa15f8e6a51f9fb4ec4773c Parents: 5a7793d Author: Lionel Liu <[email protected]> Authored: Tue Apr 24 08:41:36 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Apr 24 08:41:36 2018 +0800 ---------------------------------------------------------------------- .../metastore/hive/ThriftMetastoreClient.java | 250 +++++++++++++++++++ 1 file changed, 250 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/84955eb1/service/src/main/java/org/apache/griffin/core/metastore/hive/ThriftMetastoreClient.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/ThriftMetastoreClient.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/ThriftMetastoreClient.java new file mode 100644 index 0000000..90f4739 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/ThriftMetastoreClient.java @@ -0,0 +1,250 @@ + +/** + * Copyright (C) 2015-2017 The Apache Software Foundation and Expedia Inc. + * + * This code is based on Hive's HiveMetaStoreClient: + * + * https://github.com/apache/hive/blob/rel/release-2.1.0/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.griffin.core.metastore.hive; + + import java.io.Closeable; + import java.io.IOException; + import java.net.URI; + import java.util.Random; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + + import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.conf.HiveConfUtil; + import org.apache.hadoop.hive.metastore.MetaStoreUtils; + import org.apache.hadoop.hive.metastore.api.MetaException; + import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; + import org.apache.hadoop.hive.shims.ShimLoader; + import org.apache.hadoop.hive.shims.Utils; + import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; + import org.apache.hadoop.util.StringUtils; + import org.apache.thrift.TException; + import org.apache.thrift.protocol.TBinaryProtocol; + import org.apache.thrift.protocol.TCompactProtocol; + import org.apache.thrift.protocol.TProtocol; + import org.apache.thrift.transport.TFramedTransport; + import org.apache.thrift.transport.TSocket; + import org.apache.thrift.transport.TTransport; + import org.apache.thrift.transport.TTransportException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + +class ThriftMetastoreClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ThriftMetastoreClient.class); + + private static final AtomicInteger CONN_COUNT = new AtomicInteger(0); + + private ThriftHiveMetastore.Iface client = null; + private TTransport transport = null; + private boolean isConnected = false; + private URI metastoreUris[]; + private String tokenStrForm; + protected final HiveConf conf; + + // for thrift connects + private int retries = 5; + private long retryDelaySeconds = 0; + + public ThriftMetastoreClient(HiveConf conf) { + this.conf = conf; + String msUri = conf.getVar(ConfVars.METASTOREURIS); + + if (HiveConfUtil.isEmbeddedMetaStore(msUri)) { + throw new RuntimeException("You can't waggle an embedded metastore"); + } + + // get the number retries + retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); + retryDelaySeconds = conf.getTimeVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); + + // user wants file store based configuration + if (msUri != null) { + String metastoreUrisString[] = msUri.split(","); + metastoreUris = new URI[metastoreUrisString.length]; + try { + int i = 0; + for (String s : metastoreUrisString) { + URI tmpUri = new URI(s); + if (tmpUri.getScheme() == null) { + throw new IllegalArgumentException("URI: " + s + " does not have a scheme"); + } + metastoreUris[i++] = tmpUri; + } + } catch (IllegalArgumentException e) { + throw (e); + } catch (Exception e) { + String exInfo = "Got exception: " + e.getClass().getName() + " " + e.getMessage(); + LOG.error(exInfo, e); + throw new RuntimeException(exInfo, e); + } + } else { + LOG.error("NOT getting uris from conf"); + throw new RuntimeException("MetaStoreURIs not found in conf file"); + } + } + + public void open() { + if (isConnected) { + return; + } + TTransportException tte = null; + boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); + boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL); + int clientSocketTimeout = (int) conf.getTimeVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); + + for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { + for (URI store : metastoreUris) { + LOG.info("Trying to connect to metastore with URI " + store); + try { + transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); + if (useSasl) { + // Wrap thrift connection with SASL for secure connection. + try { + HadoopThriftAuthBridge.Client authBridge = ShimLoader.getHadoopThriftAuthBridge().createClient(); + + // check if we should use delegation tokens to authenticate + // the call below gets hold of the tokens if they are set up by hadoop + // this should happen on the map/reduce tasks if the client added the + // tokens into hadoop's credential store in the front end during job + // submission. + String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE); + // tokenSig could be null + tokenStrForm = Utils.getTokenStrForm(tokenSig); + if (tokenStrForm != null) { + // authenticate using delegation tokens via the "DIGEST" mechanism + transport = authBridge.createClientTransport(null, store.getHost(), "DIGEST", tokenStrForm, transport, + MetaStoreUtils.getMetaStoreSaslProperties(conf)); + } else { + String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); + transport = authBridge.createClientTransport(principalConfig, store.getHost(), "KERBEROS", null, + transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); + } + } catch (IOException ioe) { + LOG.error("Couldn't create client transport", ioe); + throw new MetaException(ioe.toString()); + } + } else if (useFramedTransport) { + transport = new TFramedTransport(transport); + } + final TProtocol protocol; + if (useCompactProtocol) { + protocol = new TCompactProtocol(transport); + } else { + protocol = new TBinaryProtocol(transport); + } + client = new ThriftHiveMetastore.Client(protocol); + try { + transport.open(); + LOG.info("Opened a connection to metastore '" + + store + + "', total current connections to all metastores: " + + CONN_COUNT.incrementAndGet()); + + isConnected = true; + } catch (TTransportException e) { + tte = e; + if (LOG.isDebugEnabled()) { + LOG.warn("Failed to connect to the MetaStore Server...", e); + } else { + // Don't print full exception trace if DEBUG is not on. + LOG.warn("Failed to connect to the MetaStore Server..."); + } + } + } catch (MetaException e) { + LOG.error("Unable to connect to metastore with URI " + store + " in attempt " + attempt, e); + } + if (isConnected) { + break; + } + } + // Wait before launching the next round of connection retries. + if (!isConnected && retryDelaySeconds > 0) { + try { + LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt."); + Thread.sleep(retryDelaySeconds * 1000); + } catch (InterruptedException ignore) {} + } + } + + if (!isConnected) { + throw new RuntimeException("Could not connect to meta store using any of the URIs provided. Most recent failure: " + + StringUtils.stringifyException(tte)); + } + LOG.info("Connected to metastore."); + } + + public void reconnect() { + close(); + // Swap the first element of the metastoreUris[] with a random element from the rest + // of the array. Rationale being that this method will generally be called when the default + // connection has died and the default connection is likely to be the first array element. + promoteRandomMetaStoreURI(); + open(); + } + + @Override + public void close() { + if (!isConnected) { + return; + } + isConnected = false; + try { + if (client != null) { + client.shutdown(); + } + } catch (TException e) { + LOG.debug("Unable to shutdown metastore client. Will try closing transport directly.", e); + } + // Transport would have got closed via client.shutdown(), so we don't need this, but + // just in case, we make this call. + if (isOpen()) { + transport.close(); + transport = null; + } + LOG.info("Closed a connection to metastore, current connections: " + CONN_COUNT.decrementAndGet()); + } + + public boolean isOpen() { + return transport != null && transport.isOpen(); + } + + protected ThriftHiveMetastore.Iface getClient() { + return client; + } + + /** + * Swaps the first element of the metastoreUris array with a random element from the remainder of the array. + */ + private void promoteRandomMetaStoreURI() { + if (metastoreUris.length <= 1) { + return; + } + Random rng = new Random(); + int index = rng.nextInt(metastoreUris.length - 1) + 1; + URI tmp = metastoreUris[0]; + metastoreUris[0] = metastoreUris[index]; + metastoreUris[index] = tmp; + } + +} \ No newline at end of file
