add version info & remove unnecessory file
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/28460bd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/28460bd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/28460bd8 Branch: refs/heads/master Commit: 28460bd8e3c86c934617fcebf1ed1594f7543847 Parents: e7b1691 Author: mashengchen <[email protected]> Authored: Wed Mar 2 02:46:34 2016 +0000 Committer: mashengchen <[email protected]> Committed: Wed Mar 2 02:46:34 2016 +0000 ---------------------------------------------------------------------- .../hbase/client/transactional/HBaseDCZK.java | 1111 ------------------ .../hbase/client/transactional/PeerInfo.java | 226 ---- .../hbase/client/transactional/STRConfig.java | 426 ------- .../client/transactional/XDCStatusWatcher.java | 207 ---- .../transactional/TrxRegionEndpoint.java.tmpl | 70 +- .../transactional/TrxRegionObserver.java.tmpl | 25 +- core/sql/pom.xml.apache | 19 +- core/sql/pom.xml.hdp | 21 +- 8 files changed, 48 insertions(+), 2057 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/HBaseDCZK.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/HBaseDCZK.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/HBaseDCZK.java deleted file mode 100644 index 2861f87..0000000 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/HBaseDCZK.java +++ /dev/null @@ -1,1111 +0,0 @@ -// @@@ START COPYRIGHT @@@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// @@@ END COPYRIGHT @@@ - -package org.apache.hadoop.hbase.client.transactional; - -import java.io.IOException; - -import java.nio.charset.Charset; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Scanner; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; - -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; - -import org.apache.hadoop.hbase.client.transactional.STRConfig; - -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; - -import org.apache.hadoop.hbase.client.transactional.PeerInfo; -import org.apache.hadoop.hbase.client.transactional.XDCStatusWatcher; - -/** - * Multi Data Center specific Zookeeper operations - */ -public class HBaseDCZK implements Abortable { - - static final Log LOG = LogFactory.getLog(HBaseDCZK.class); - - private static final Charset CHARSET = Charset.forName("UTF-8"); - - public static final String m_my_cluster_id_string = "my_cluster_id"; - public static final String m_lock_string = "lock"; - public static final String m_clusters_string = "clusters"; - public static final String m_quorum_string = "quorum"; - public static final String m_port_string = "port"; - public static final String m_status_string = "status"; - public static final String m_trafodion_up_string = "trafodion_up"; - - public static final String m_base_node = "/hbase/Trafodion/multi_dc"; - public static final String m_clusters_node = m_base_node + "/" + m_clusters_string; - public static final String m_my_cluster_id_node = m_base_node + "/" + m_my_cluster_id_string; - public static final String m_lock_node = m_base_node + "/" + m_lock_string; - - private ZooKeeperWatcher m_zkw; - private Configuration m_config; - - /** - * @param conf - * @throws Exception - */ - public HBaseDCZK(final Configuration pv_config) - throws InterruptedException, KeeperException, IOException - { - if (LOG.isTraceEnabled()) LOG.trace("HBaseDCZK(conf) -- ENTRY"); - m_config = pv_config; - this.m_zkw = new ZooKeeperWatcher(m_config, "DC", this, false); - } - - /** - * @param znode - * @return znode data - * @throws KeeperException - */ - private byte [] get_znode_data (String znode) - throws KeeperException, InterruptedException - { - try { - return ZKUtil.getData(m_zkw, znode); - } - catch(KeeperException.NoNodeException ke) { - LOG.debug(m_zkw.prefix("Unable to list children of znode " - + znode + " because node does not exist (not an error)")); - return null; - } - } - - /** - * @param znode - * @return List<String> of children nodes - * @throws KeeperException - */ - private List<String> get_znode_children(String pv_node) - throws KeeperException - { - return ZKUtil.listChildrenNoWatch(m_zkw, pv_node); - } - - /** - * @param toDelete - * @throws KeeperException - */ - public void delete_cluster_entry(String pv_cluster_id ) - throws KeeperException - { - LOG.info("delete_cluster_entry -- ENTRY -- key: " + pv_cluster_id); - ZKUtil.deleteNodeFailSilent(m_zkw, m_clusters_node + "/" + pv_cluster_id); - LOG.info("delete_cluster_entry -- EXIT "); - } - - public ZooKeeperWatcher getZKW() - { - return m_zkw; - } - - public void register_status_listener(ZooKeeperListener pv_zkl) - throws IOException, KeeperException - { - if (pv_zkl == null) { - return; - } - - m_zkw.registerListener(pv_zkl); - } - - /** - * @param cluster Id - * @throws IOException - */ - public void watch_status_znode(String pv_cluster_id) - throws IOException, KeeperException - { - - LOG.info("HBaseDCZK:watch_status_znode" - ); - - try { - String lv_cluster_id_node = m_clusters_node + "/" + pv_cluster_id; - String lv_status_node = lv_cluster_id_node + "/" + m_status_string; - ZKUtil.setWatchIfNodeExists(m_zkw, lv_status_node); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:watch_status_znode: ZKW. Error in setting watch: " - + m_clusters_node + "/" + pv_cluster_id - + " , throwing IOException " + e - ); - } - } - - /** - * @param cluster Id - * @throws IOException - */ - public void watch_clusters_znode() - throws IOException, KeeperException - { - - LOG.info("HBaseDCZK:watch_clusters_znode" - ); - - try { - ZKUtil.setWatchIfNodeExists(m_zkw, m_clusters_node); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:watch_clusters_znode: ZKW Unable to create watch on: " - + m_clusters_node - + " , throwing IOException " + e - ); - } - } - - /** - * @throws IOException - */ - public void watch_status_all_clusters() - throws IOException, KeeperException - { - - LOG.info("HBaseDCZK:watch_status_all_clusters" - ); - - Map<Integer, PeerInfo> lv_pi_list = list_clusters(); - - if (lv_pi_list == null) { - return; - } - - for (PeerInfo lv_pi : lv_pi_list.values()) { - watch_status_znode(lv_pi.get_id()); - } - } - - public void watch_all() - throws IOException, KeeperException - { - watch_clusters_znode(); - watch_status_all_clusters(); - } - - /** - * @param cluster Id - * @param quorum - * @param port - * @param status - * @throws IOException - */ - public void set_peer_znode(String pv_cluster_id, - String pv_quorum, - String pv_port, - String pv_status - ) - throws IOException - { - - LOG.info("HBaseDCZK:set_peer_znode: " - + " cluster_id : " + pv_cluster_id - + " quorum: " + pv_quorum - + " port: " + pv_port - + " status: " + pv_status - ); - - try { - - if (this.is_locked(pv_cluster_id)) { - System.out.println("Could not update info of cluster id: " + pv_cluster_id - + " as it is locked."); - - return; - } - - String lv_cluster_id_node = m_clusters_node + "/" + pv_cluster_id; - ZKUtil.createWithParents(m_zkw, lv_cluster_id_node); - - if ((pv_quorum != null) && (pv_quorum.length() > 0)) { - String lv_quorum_node = lv_cluster_id_node + "/" + m_quorum_string; - ZKUtil.createSetData(m_zkw, lv_quorum_node, pv_quorum.getBytes(CHARSET)); - } - - if ((pv_port != null) && (pv_port.length() > 0)) { - String lv_port_node = lv_cluster_id_node + "/" + m_port_string; - ZKUtil.createSetData(m_zkw, lv_port_node, pv_port.getBytes(CHARSET)); - } - - if ((pv_status != null) && (pv_status.length() > 0)) { - String lv_status_node = lv_cluster_id_node + "/" + m_status_string; - ZKUtil.createSetData(m_zkw, lv_status_node, pv_status.getBytes(CHARSET)); - } - - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:set_peer_znode: ZKW Unable to create peer zNode: " - + m_clusters_node + "/" + pv_cluster_id - + " , throwing IOException " + e - ); - } - } - - /** - * @param cluster Id - * @param quorum - * @param port - * @param status - * @throws IOException - */ - public void set_trafodion_znode(String pv_cluster_id, - String pv_node_id, - String pv_trafodion_status_string - ) - throws IOException - { - - if (pv_cluster_id == null) { - if (LOG.isTraceEnabled()) LOG.trace("set_trafodion_znode, pv_cluster_id is null"); - return; - } - - if (pv_node_id == null) { - if (LOG.isTraceEnabled()) LOG.trace("set_trafodion_znode, pv_node_id is null"); - return; - } - - if (LOG.isInfoEnabled()) LOG.info("HBaseDCZK:set_trafodion_znode: " - + " cluster_id : " + pv_cluster_id - + " node_id : " + pv_node_id - + " node_data : " + pv_trafodion_status_string - ); - - try { - String lv_trafodion_up_node = m_clusters_node - + "/" + pv_cluster_id - + "/" + m_trafodion_up_string; - - ZKUtil.createWithParents(m_zkw, lv_trafodion_up_node); - - String lv_ephemeral_node_id_node = lv_trafodion_up_node - + "/" + pv_node_id; - ZKUtil.createEphemeralNodeAndWatch(m_zkw, - lv_ephemeral_node_id_node, - pv_trafodion_status_string.getBytes(CHARSET)); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:set_trafodion_znode: ZKW Unable to create zNode: " - + m_clusters_node - + "/" + pv_cluster_id - + "/" + pv_node_id - + " , throwing IOException " + e - ); - } - } - - /** - * @param cluster Id - * @throws IOException - */ - public PeerInfo get_peer_znode(String pv_cluster_id) - throws IOException - { - - LOG.info("HBaseDCZK:get_peer_znode: " - + " cluster_id : " + pv_cluster_id - ); - - String lv_cluster_id_node = m_clusters_node + "/" + pv_cluster_id; - - try { - - byte[] znode_data = get_znode_data(lv_cluster_id_node); - if (znode_data == null) { - return null; - } - - PeerInfo lv_pi = new PeerInfo(); - lv_pi.set_id(pv_cluster_id); - - String lv_quorum_node = lv_cluster_id_node + "/" + m_quorum_string; - lv_pi.set_quorum(get_znode_data(lv_quorum_node)); - - String lv_port_node = lv_cluster_id_node + "/" + m_port_string; - lv_pi.set_port(get_znode_data(lv_port_node)); - - String lv_status_node = lv_cluster_id_node + "/" + m_status_string; - lv_pi.set_status(get_znode_data(lv_status_node)); - - String lv_trafodion_up_node = lv_cluster_id_node + "/" + m_trafodion_up_string; - List<String> lv_list_nodes = get_znode_children(lv_trafodion_up_node); - if ((lv_list_nodes != null) && - (lv_list_nodes.size() > 0)) { - lv_pi.setTrafodionStatus(true); - } - - return lv_pi; - - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:get_peer_znode: ZKW Unable to get peer zNode: " - + lv_cluster_id_node - + " , throwing IOException " + e - ); - } - catch (InterruptedException e) { - throw new IOException("HBaseDCZK:get_peer_znode: ZKW Unable to get peer zNode: " - + lv_cluster_id_node - + " , throwing IOException " + e - ); - } - - } - - /** - * @param cluster Id - * @throws IOException - */ - public boolean delete_peer_znode(String pv_cluster_id) - throws IOException - { - if (LOG.isTraceEnabled()) LOG.trace("HBaseDCZK:delete_peer_znode: " - + " cluster_id : " + pv_cluster_id - ); - - try { - String lv_myid = get_my_id(); - - if (pv_cluster_id.equals(lv_myid)) { - System.out.println("Cannot delete your own info."); - return false; - } - - Map<Integer, PeerInfo> lv_pi_list = list_clusters(); - - if (lv_pi_list == null) { - return false; - } - - boolean lb_found = false; - for (PeerInfo lv_pi : lv_pi_list.values()) { - if (lv_pi.get_id().equals(pv_cluster_id)) { - lb_found = true; - } - } - - if ( ! lb_found ) { - System.out.println("peer: " - + pv_cluster_id - + " does not exist"); - return false; - } - - ZKUtil.deleteNodeRecursively(m_zkw, - m_clusters_node + "/" + pv_cluster_id - ); - - } catch (KeeperException e) { - throw new IOException("HBaseDCZK:delete_peer_znode: ZKW, KeeperException trying to delete: " - + pv_cluster_id - + " , throwing IOException " + e - ); - } catch (InterruptedException e) { - throw new IOException("HBaseDCZK:delete_peer_znode: ZKW, InterruptedException trying to delete: " - + pv_cluster_id - + " , throwing IOException " + e - ); - } - - return true; - - } - - /** - * @param cluster Id - * @throws IOException - */ - public Map<Integer, PeerInfo> list_clusters() - throws KeeperException, IOException - { - LOG.info("HBaseDCZK:list_clusters" - ); - - List<String> peer_node_strings = new ArrayList<String>(); - Map<Integer, PeerInfo> peer_info_list = new HashMap<Integer, PeerInfo>(); - - peer_node_strings = get_znode_children(m_clusters_node); - if (peer_node_strings == null) { - return null; - } - - for (String peer_node_string : peer_node_strings) { - int lv_id = Integer.parseInt(peer_node_string); - PeerInfo lv_pi = get_peer_znode(peer_node_string); - peer_info_list.put(lv_id, lv_pi); - } - - return peer_info_list; - } - - /** - * @param cluster Id - * push_to ZK information from this cluster to the provided cluster - * @throws IOException - */ - public void push_to_cluster(String pv_cluster_id, - PeerInfo pv_pi) - throws KeeperException, IOException - { - LOG.info("HBaseDCZK:push_to_cluster" - + " cluster id: " + pv_cluster_id - + " data: " + pv_pi - ); - - try { - - STRConfig lv_STRConfig = null; - Configuration lv_config = null; - - lv_STRConfig = STRConfig.getInstance(m_config); - lv_config = lv_STRConfig.getPeerConfiguration(Integer.parseInt(pv_cluster_id), false); - if (lv_config == null) { - System.out.println("Peer ID: " + pv_cluster_id + " does not exist OR it has not been configured."); - return; - } - - HBaseDCZK lv_zk = new HBaseDCZK(lv_config); - - if (lv_zk.is_locked(pv_cluster_id)) { - System.out.println("Could not push info of cluster id: " + pv_pi.get_id() - + " to cluster id: " + pv_cluster_id - + " as it is locked."); - } - else { - lv_zk.set_peer_znode(pv_pi.get_id(), - pv_pi.get_quorum(), - pv_pi.get_port(), - pv_pi.get_status() - ); - } - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:push_to_cluster: ZKW, KeeperException: " - + " , throwing IOException " + e - ); - } - catch (InterruptedException e) { - throw new IOException("HBaseDCZK:push_to_cluster: ZKW, InterruptedException: " - + " , throwing IOException " + e - ); - } - catch (Exception e) { - System.out.println("exception: " + e); - System.exit(1); - } - - } - - public void push_to_clusters() - throws KeeperException, IOException - { - LOG.info("HBaseDCZK:push_to_clusters" - ); - - Map<Integer, PeerInfo> lv_pi_list = list_clusters(); - - if (lv_pi_list == null) { - return; - } - - try { - - String lv_my_id = get_my_id(); - PeerInfo lv_my_pi = lv_pi_list.get(Integer.parseInt(lv_my_id)); - - for (PeerInfo lv_pi : lv_pi_list.values()) { - if (lv_pi.get_id().equals(lv_my_id)) { - continue; - } - - push_to_cluster(lv_pi.get_id(), lv_my_pi); - } - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:push_to_clusters: ZKW, KeeperException: " - + " , throwing IOException " + e - ); - } - catch (InterruptedException e) { - throw new IOException("HBaseDCZK:push_to_clusters: ZKW, InterruptedException: " - + " , throwing IOException " + e - ); - } - - } - - /** - * @param cluster Id - * pulls ZK information from the cluster to the provided cluster - * @throws IOException - */ - public void pull_cluster(String pv_cluster_id) - throws KeeperException, IOException - { - LOG.info("HBaseDCZK:pull_cluster" - + " cluster id: " + pv_cluster_id - ); - - try { - - STRConfig lv_STRConfig = null; - Configuration lv_config = null; - - lv_STRConfig = STRConfig.getInstance(m_config); - lv_config = lv_STRConfig.getPeerConfiguration(Integer.parseInt(pv_cluster_id), false); - if (lv_config == null) { - System.out.println("Peer ID: " + pv_cluster_id + " does not exist OR it has not been configured."); - return; - } - - HBaseDCZK lv_zk = new HBaseDCZK(lv_config); - - PeerInfo lv_pi = lv_zk.get_peer_znode(pv_cluster_id); - - this.set_peer_znode(lv_pi.get_id(), - lv_pi.get_quorum(), - lv_pi.get_port(), - lv_pi.get_status()); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:pull_cluster: ZKW, KeeperException: " - + " , throwing IOException " + e - ); - } - catch (InterruptedException e) { - throw new IOException("HBaseDCZK:pull_cluster: ZKW, InterruptedException: " - + " , throwing IOException " + e - ); - } - catch (Exception e) { - System.out.println("exception: " + e); - System.exit(1); - } - - } - - // Communicates with each cluster and gets that cluster's config information - public void pull_clusters() - throws KeeperException, IOException - { - LOG.info("HBaseDCZK:pull_clusters" - ); - - try { - - String lv_my_id = get_my_id(); - - if (this.is_locked(lv_my_id)) { - System.out.println("Could not pull info for cluster id: " + lv_my_id - + " as it is locked."); - - return; - } - - Map<Integer, PeerInfo> lv_pi_list = list_clusters(); - - if (lv_pi_list == null) { - return; - } - - for (PeerInfo lv_pi : lv_pi_list.values()) { - if (lv_pi.get_id().equals(lv_my_id)) { - continue; - } - - pull_cluster(lv_pi.get_id()); - } - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:pull_clusters: ZKW, KeeperException: " - + " , throwing IOException " + e - ); - } - catch (InterruptedException e) { - throw new IOException("HBaseDCZK:pull_clusters: ZKW, InterruptedException: " - + " , throwing IOException " + e - ); - } - } - - /** - * @param my_cluster_id - * @throws IOException - */ - public boolean is_locked(String pv_my_cluster_id) - throws IOException - { - LOG.info("HBaseDCZK:is_locked: cluster ID: " + pv_my_cluster_id); - int lv_exists = 0; - - try { - lv_exists = ZKUtil.checkExists(m_zkw, m_lock_node); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:is_locked: ZKW Unable to check existence of znode: " - + m_lock_node - + " , throwing IOException " + e - ); - } - - if (lv_exists != -1) { - return true; - } - - return false; - } - - - /** - * @param my_cluster_id - * @throws IOException - */ - public void lock_db(String pv_my_cluster_id) - throws IOException - { - LOG.info("HBaseDCZK:lock_db: cluster ID: " + pv_my_cluster_id); - - try { - ZKUtil.createSetData(m_zkw, m_lock_node, pv_my_cluster_id.getBytes(CHARSET)); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:lock_db: ZKW Unable to create zNode: " - + m_lock_node - + " for " + pv_my_cluster_id - + " , throwing IOException " + e - ); - } - } - - /** - * @param my_cluster_id - * @throws IOException - */ - public void unlock_db(String pv_my_cluster_id) - throws IOException - { - LOG.info("HBaseDCZK:unlock_db: cluster ID: " + pv_my_cluster_id); - - try { - ZKUtil.deleteNodeFailSilent(m_zkw, m_lock_node); - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:unlock_db: ZKW Unable to delete zNode: " - + m_lock_node - + " for " + pv_my_cluster_id - + " , throwing IOException " + e - ); - } - } - - /** - * @param my_cluster_id - * @throws IOException - */ - public void set_my_id(String pv_my_cluster_id) - throws IOException - { - LOG.info("HBaseDCZK:set_my_id: cluster ID: " + pv_my_cluster_id); - - try { - ZKUtil.createSetData(m_zkw, m_my_cluster_id_node, pv_my_cluster_id.getBytes(CHARSET)); - PeerInfo lv_pi = get_peer_znode(pv_my_cluster_id); - if (lv_pi == null) { - set_peer_znode(pv_my_cluster_id, - new String(m_config.get( STRConfig.ZK_QUORUM )), - new String(m_config.get( STRConfig.ZK_PORT )), - new String("hupsup")); - } - } - catch (KeeperException e) { - throw new IOException("HBaseDCZK:set_my_id: ZKW Unable to create zNode: " - + pv_my_cluster_id - + " , throwing IOException " + e - ); - } - } - - /** - * @return - */ - public String get_my_id() - throws KeeperException, InterruptedException - { - if (LOG.isTraceEnabled()) LOG.trace("HBaseDCZK:get_my_id"); - - byte[] b_my_cluster_id = get_znode_data(m_my_cluster_id_node); - if (b_my_cluster_id != null) { - String lv_cid = new String(b_my_cluster_id); - if (LOG.isTraceEnabled()) LOG.trace("get_my_id id: " + lv_cid); - return lv_cid; - } - - return null; - } - - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.Abortable#abort(java.lang.String, java.lang.Throwable) - */ - @Override - public void abort(String arg0, Throwable arg1) { - // TODO Auto-generated method stub - - } - - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.Abortable#isAborted() - */ - @Override - public boolean isAborted() { - // TODO Auto-generated method stub - return false; - } - - static void usage() { - - System.out.println("usage:"); - System.out.println("HBaseDCZK [<command> | <options>...]"); - System.out.println("<command> : < -setmyid <id>"); - System.out.println(" : | -getmyid"); - System.out.println(" : | -set <cluster info>"); - System.out.println(" : | -get <id>"); - System.out.println(" : | -list"); - System.out.println(" : | -delete <id>"); - System.out.println(" : | -push "); - System.out.println(" : | -pull "); - System.out.println(" : | -lock "); - System.out.println(" : | -unlock >"); - System.out.println("<options> : [ <peer info> | -h | -v ]"); - System.out.println("<cluster info> : < <cluster id> [ <quorum info> | <port info> | <status info> ]... >"); - System.out.println("<cluster id> : -id <id> "); - System.out.println("<quorum info> : -quorum <zookeeper quorum>"); - System.out.println("<port info> : -port <zookeeper client port>"); - System.out.println("<status info> : -status <status>"); - System.out.println("<status> : <STR Status>"); - System.out.println("<STR Status> : <STR Up> (sup)| <STR Down> (sdn)"); - System.out.println("<Trafodion Status>: <Trafodion Up>(tup)| <Trafodion Down> (tdn)"); - System.out.println("<peer info> : -peer <id>"); - System.out.println(" : With this option the command is executed at the specified peer."); - System.out.println(" : (Defaults to the local cluster)"); - System.out.println("<id> : A number between 1 and 100 (inclusive)"); - System.out.println("-h : Help (this output)."); - System.out.println("-v : Verbose output. "); - - } - - public static void main(String [] Args) throws Exception { - - boolean lv_retcode = true; - - boolean lv_verbose = false; - boolean lv_test = false; - int lv_peer_id = 0; - - String lv_my_id = new String(); - String lv_id = new String(); - String lv_quorum = new String(); - String lv_port = new String(); - String lv_status = new String(); - - boolean lv_cmd_set_my_id = false; - boolean lv_cmd_get_my_id = false; - boolean lv_cmd_set = false; - boolean lv_cmd_get = false; - boolean lv_cmd_delete = false; - boolean lv_cmd_lock = false; - boolean lv_cmd_unlock = false; - boolean lv_cmd_list = false; - boolean lv_cmd_push = false; - boolean lv_cmd_pull = false; - - int lv_index = 0; - for (String lv_arg : Args) { - lv_index++; - if (lv_arg.compareTo("-h") == 0) { - usage(); - System.exit(0); - } - if (lv_arg.compareTo("-v") == 0) { - lv_verbose = true; - } - if (lv_arg.compareTo("-t") == 0) { - lv_test = true; - } - else if (lv_arg.compareTo("-peer") == 0) { - lv_peer_id = Integer.parseInt(Args[lv_index]); - if (lv_verbose) System.out.println("Talk to Peer Cluster, ID: " + lv_peer_id); - } - else if (lv_arg.compareTo("-id") == 0) { - lv_id =Args[lv_index]; - int lv_integer_id = Integer.parseInt(lv_id); - if ((lv_integer_id < 1) || (lv_integer_id > 100)) { - System.out.println("The id has to be between 1 and 100 (inclusive). Exitting..."); - usage(); - System.exit(1); - } - if (lv_verbose) System.out.println("Cluster ID: " + lv_id); - } - else if (lv_arg.compareTo("-status") == 0) { - lv_status =Args[lv_index]; - if (lv_verbose) System.out.println("Status: " + lv_status); - } - else if (lv_arg.compareTo("-quorum") == 0) { - lv_quorum =Args[lv_index]; - if (lv_verbose) System.out.println("Quorum: " + lv_quorum); - } - else if (lv_arg.compareTo("-port") == 0) { - lv_port =Args[lv_index]; - if (lv_verbose) System.out.println("Port: " + lv_port); - } - else if (lv_arg.compareTo("-setmyid") == 0) { - if (lv_verbose) System.out.print("Command: setmyid:"); - lv_my_id =Args[lv_index]; - if ((lv_my_id != null) && (lv_my_id.length() > 0)) { - if (lv_verbose) System.out.println(lv_my_id); - int lv_integer_id = Integer.parseInt(lv_my_id); - if ((lv_integer_id < 1) || (lv_integer_id > 100)) { - System.out.println("The id has to be between 1 and 100 (inclusive). Exitting..."); - usage(); - System.exit(1); - } - lv_cmd_set_my_id = true; - } - else { - System.out.println("Id not provided. Exitting..."); - System.exit(1); - } - } - else if (lv_arg.compareTo("-getmyid") == 0) { - if (lv_verbose) System.out.println("Command: getmyid"); - lv_cmd_get_my_id = true; - } - else if (lv_arg.compareTo("-set") == 0) { - if (lv_verbose) System.out.println("Command: set"); - lv_cmd_set = true; - } - else if (lv_arg.compareTo("-get") == 0) { - if (lv_verbose) System.out.println("Command: get"); - lv_cmd_get = true; - if (Args.length > lv_index) { - if (lv_verbose) System.out.println("Args.length: " + Args.length - + " lv_index: " + lv_index); - lv_id = Args[lv_index]; - } - if ((lv_id != null) && (lv_id.length() > 0)) { - if (lv_verbose) System.out.println("Provided id: " + lv_id); - } - else { - System.out.println("Id not provided. Getting this cluster's info..."); - } - } - else if (lv_arg.compareTo("-delete") == 0) { - if (lv_verbose) System.out.println("Command: delete"); - if (Args.length > lv_index) { - lv_id =Args[lv_index]; - } - if ((lv_id != null) && (lv_id.length() > 0)) { - lv_cmd_delete = true; - } - else { - System.out.println("Id not provided. Exitting..."); - System.exit(1); - } - } - else if (lv_arg.compareTo("-list") == 0) { - if (lv_verbose) System.out.println("Command: list"); - lv_cmd_list = true; - } - else if (lv_arg.compareTo("-lock") == 0) { - if (lv_verbose) System.out.println("Command: lock"); - lv_cmd_lock = true; - } - else if (lv_arg.compareTo("-unlock") == 0) { - if (lv_verbose) System.out.println("Command: unlock"); - lv_cmd_unlock = true; - } - else if (lv_arg.compareTo("-push") == 0) { - if (lv_verbose) System.out.println("Command: push"); - lv_cmd_push = true; - } - else if (lv_arg.compareTo("-pull") == 0) { - if (lv_verbose) System.out.println("Command: pull"); - lv_cmd_pull = true; - } - } - - STRConfig lv_STRConfig = null; - Configuration lv_config = HBaseConfiguration.create(); - if (lv_peer_id > 0) { - try { - System.setProperty("PEERS", String.valueOf(lv_peer_id)); - lv_STRConfig = STRConfig.getInstance(lv_config); - lv_config = lv_STRConfig.getPeerConfiguration(lv_peer_id, false); - if (lv_config == null) { - System.out.println("Peer ID: " + lv_peer_id + " does not exist OR it has not been configured."); - System.exit(1); - } - } - catch (ZooKeeperConnectionException zke) { - System.out.println("Zookeeper Connection Exception trying to get STRConfig instance: " + zke); - System.exit(1); - } - catch (IOException ioe) { - System.out.println("IO Exception trying to get STRConfig instance: " + ioe); - System.exit(1); - } - } - - try { - HBaseDCZK lv_zk = new HBaseDCZK(lv_config); - if (lv_id.length() <= 0) { - lv_id = lv_zk.get_my_id(); - if (lv_verbose) System.out.println("my id: " + lv_id); - } - if (lv_cmd_set_my_id) { - lv_zk.set_my_id(lv_my_id); - if (lv_test) { - lv_zk.watch_all(); - XDCStatusWatcher lv_pw = new XDCStatusWatcher(lv_zk.getZKW()); - lv_pw.setDCZK(lv_zk); - lv_pw.setSTRConfig(lv_STRConfig); - lv_zk.register_status_listener(lv_pw); - - System.out.println("Number of listeners: " + lv_zk.getZKW().getNumberOfListeners()); - Scanner scanner = new Scanner(System.in); - System.out.print("Enter any key when done: "); - String userdata = scanner.next(); - } - } - else if (lv_cmd_get_my_id) { - lv_my_id = lv_zk.get_my_id(); - if (lv_my_id != null) { - System.out.println(lv_my_id); - } - else { - System.out.println("0"); - } - } - else if (lv_cmd_set) { - if ((lv_status.compareTo("sup") != 0) && - (lv_status.compareTo("sdn") != 0)) { - System.out.println("Status string can only be sup or sdn"); - System.exit(1); - } - lv_zk.set_peer_znode(lv_id, lv_quorum, lv_port, lv_status); - } - else if (lv_cmd_get) { - if (lv_verbose) System.out.println(lv_id); - PeerInfo lv_pi = lv_zk.get_peer_znode(lv_id); - if (lv_pi != null) { - System.out.println(lv_pi); - } - } - else if (lv_cmd_delete) { - if (lv_verbose) System.out.println(lv_id); - lv_retcode = lv_zk.delete_peer_znode(lv_id); - if (lv_retcode) { - System.out.println("Successfully deleted the info about the peer: " + lv_id); - } - } - else if (lv_cmd_list) { - Map<Integer, PeerInfo> lv_pi_list = lv_zk.list_clusters(); - if (lv_pi_list != null) { - lv_my_id = lv_zk.get_my_id(); - for (PeerInfo lv_pi : lv_pi_list.values()) { - System.out.print(lv_pi); - if ((lv_my_id != null) && - (lv_pi.get_id() != null) && - (lv_pi.get_id().equals(lv_my_id))) { - System.out.print("*"); - if (lv_zk.is_locked(lv_my_id)) System.out.println(":L"); else System.out.println(""); - } - else { - System.out.println(""); - } - } - } - } - else if (lv_cmd_lock) { - lv_my_id = lv_zk.get_my_id(); - if (lv_my_id != null) { - if (lv_verbose) System.out.println(lv_my_id); - } - else { - if (lv_verbose) System.out.println("0"); - } - lv_zk.lock_db(lv_my_id); - } - else if (lv_cmd_unlock) { - lv_my_id = lv_zk.get_my_id(); - if (lv_my_id != null) { - if (lv_verbose) System.out.println(lv_my_id); - } - else { - if (lv_verbose) System.out.println("0"); - } - lv_zk.unlock_db(lv_my_id); - } - else if (lv_cmd_push) { - lv_zk.push_to_clusters(); - } - else if (lv_cmd_pull) { - lv_zk.pull_clusters(); - } - else { - usage(); - } - } - catch (Exception e) - { - System.out.println("exception: " + e); - System.exit(1); - } - - if (! lv_retcode) { - System.exit(1); - } - - System.exit(0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/PeerInfo.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/PeerInfo.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/PeerInfo.java deleted file mode 100644 index ec0011a..0000000 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/PeerInfo.java +++ /dev/null @@ -1,226 +0,0 @@ -// @@@ START COPYRIGHT @@@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// @@@ END COPYRIGHT @@@ - -package org.apache.hadoop.hbase.client.transactional; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - - -/** - * Multi Data Center specific - */ -public class PeerInfo { - - static final Log LOG = LogFactory.getLog(PeerInfo.class); - - public static final String TRAFODION_UP = "tup"; - public static final String TRAFODION_DOWN = "tdn"; - public static final String HBASE_UP = "hup"; - public static final String HBASE_DOWN = "hdn"; - public static final String STR_UP = "sup"; - public static final String STR_DOWN = "sdn"; - - private String m_id; - private String m_quorum; - private String m_port; - private boolean m_HBaseUp; - private boolean m_TrafodionUp; - private boolean m_STRUp; - - public PeerInfo() { - if (LOG.isTraceEnabled()) LOG.trace("PeerInfo (s,s,s,s) -- ENTRY"); - - m_id = null; - m_quorum = null; - m_port = null; - m_HBaseUp = false; - m_TrafodionUp = false; - m_STRUp = false; - - } - - public PeerInfo(String pv_id, - String pv_quorum, - String pv_port, - String pv_status) - { - if (LOG.isTraceEnabled()) LOG.trace("PeerInfo (s,s,s,s) -- ENTRY"); - - m_id = pv_id; - m_quorum = pv_quorum; - m_port = pv_port; - m_HBaseUp = false; - m_TrafodionUp = false; - m_STRUp = false; - set_internal_status_fields(pv_status); - } - - public String get_id() { - return m_id; - } - - public void set_id(String pv_id) { - m_id = pv_id; - } - - public String get_quorum() { - return m_quorum; - } - - public void set_quorum(String pv_quorum) { - m_quorum = pv_quorum; - } - - public void set_quorum(byte[] pv_quorum) { - if (pv_quorum != null) { - m_quorum = new String(pv_quorum); - } - } - - public String get_port() { - return m_port; - } - - public void set_port(String pv_port) { - m_port = pv_port; - } - - public void set_port(byte[] pv_port) { - if (pv_port != null) { - m_port = new String(pv_port); - } - } - - public String get_status() { - StringBuilder lv_sb = new StringBuilder(); - - get_status(lv_sb); - - return lv_sb.toString(); - } - - public void get_status(StringBuilder pv_sb) { - - if (pv_sb == null) { - return; - } - - if (m_TrafodionUp) { - pv_sb.append(TRAFODION_UP); - } - else { - pv_sb.append(TRAFODION_DOWN); - } - pv_sb.append("-"); - - if (m_STRUp) { - pv_sb.append(STR_UP); - } - else { - pv_sb.append(STR_DOWN); - } - - } - - private void set_internal_status_fields(String pv_status) { - if (pv_status.contains(HBASE_UP)) { - m_HBaseUp = true; - } - if (pv_status.contains(HBASE_DOWN)) { - m_HBaseUp = false; - } - if (pv_status.contains(TRAFODION_UP)) { - m_TrafodionUp = true; - } - if (pv_status.contains(TRAFODION_DOWN)) { - m_TrafodionUp = false; - } - if (pv_status.contains(STR_UP)) { - m_STRUp = true; - } - if (pv_status.contains(STR_DOWN)) { - m_STRUp = false; - } - } - - public void set_status(String pv_status) { - set_internal_status_fields(pv_status); - } - - public void set_status(byte[] pv_status) { - if (pv_status != null) { - String lv_status = new String(pv_status); - set_internal_status_fields(lv_status); - } - } - - public void setHBaseStatus(boolean pv_status) { - m_HBaseUp = pv_status; - } - - public void setTrafodionStatus(boolean pv_status) { - m_TrafodionUp = pv_status; - } - - public void setSTRStatus(boolean pv_status) { - m_STRUp = pv_status; - } - - public boolean isHBaseUp() { - return m_HBaseUp; - } - - public boolean isTrafodionUp() { - return m_TrafodionUp; - } - - public boolean isSTRUp() { - return m_STRUp; - } - - public String toString() - { - StringBuilder lv_sb = new StringBuilder(); - lv_sb = lv_sb - .append(m_id) - .append(":") - .append(m_quorum) - .append(":") - .append(m_port) - .append(":"); - - get_status(lv_sb); - - - return lv_sb.toString(); - } - - public static void main(String [] Args) throws Exception - { - PeerInfo lv_peer = new PeerInfo("1", "q", "24000", "sup"); - System.exit(0); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/STRConfig.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/STRConfig.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/STRConfig.java deleted file mode 100644 index 3236e6a..0000000 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/STRConfig.java +++ /dev/null @@ -1,426 +0,0 @@ -// @@@ START COPYRIGHT @@@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// @@@ END COPYRIGHT @@@ - -package org.apache.hadoop.hbase.client.transactional; - -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collection; -import java.util.List; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ConcurrentHashMap; -import java.util.HashMap; - -import org.apache.commons.codec.binary.Hex; - -import org.apache.hadoop.fs.Path; - -import org.apache.hadoop.hbase.ServerName; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos; - -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; - -import org.apache.hadoop.hbase.client.transactional.PeerInfo; -import org.apache.hadoop.ipc.RemoteException; - -import org.apache.zookeeper.KeeperException; - -import com.google.protobuf.ByteString; - -/** - * STR Config. - */ -public class STRConfig { - - static final Log LOG = LogFactory.getLog(STRConfig.class); - - static final String ZK_QUORUM = "hbase.zookeeper.quorum"; - static final String ZK_PORT = "hbase.zookeeper.property.clientPort"; - - private static boolean sb_replicate = false; - private static Map<Integer, Configuration> peer_configs; - private static Map<Integer, HConnection> peer_connections; - private static Map<Integer, PeerInfo> peer_info_list; - private static HBaseDCZK sv_dc_zk; - private static String sv_my_cluster_id; - private static int sv_peer_count = 0; - private static int sv_trafodion_node_count = -1; - - private static STRConfig s_STRConfig = null; - - private static void add_peer(Configuration pv_config, - int pv_peer_num) - throws InterruptedException, KeeperException, IOException - { - if (LOG.isTraceEnabled()) LOG.trace("Adding config info in the map for cluster id: " + pv_peer_num - + " peer config: " + pv_config.get(ZK_QUORUM)); - peer_configs.put(pv_peer_num, pv_config); - if (LOG.isTraceEnabled()) LOG.trace("Added config info in the peer_configs map for cluster id: " + pv_peer_num); - - try { - HConnection lv_connection = HConnectionManager.createConnection(pv_config); - if (LOG.isTraceEnabled()) LOG.trace("Created connection for peer: " + pv_peer_num - + " connection: " + lv_connection); - peer_connections.put(pv_peer_num, lv_connection); - if (LOG.isTraceEnabled()) LOG.trace("Added connection in the peer_connections map for cluster id: " + pv_peer_num); - } - catch (Exception e) { - LOG.error("Exception while creating the connection: " + e); - e.printStackTrace(); - LOG.error("cause: " + e.getCause()); - } - - if (LOG.isInfoEnabled()) LOG.info("peer#" - + pv_peer_num - + ":zk quorum: " + (peer_configs.get(pv_peer_num)).get(ZK_QUORUM) - + ":zk clientPort: " + (peer_configs.get(pv_peer_num)).get(ZK_PORT) - ); - } - - private static void add_peer(Configuration pv_config, - String pv_peer_num_string, - String pv_quorum, - String pv_port) - throws InterruptedException, KeeperException, IOException - { - Configuration lv_config = HBaseConfiguration.create(pv_config); - - lv_config.set(ZK_QUORUM, pv_quorum); - lv_config.set(ZK_PORT, pv_port); - - int lv_peer_num = Integer.parseInt(pv_peer_num_string); - lv_config.setInt("esgyn.cluster.id", lv_peer_num); - - add_peer(lv_config, - lv_peer_num); - - } - - public static void initObjects(Configuration pv_config) - throws InterruptedException, KeeperException, IOException - { - if (pv_config == null) { - return; - } - - pv_config.set("hbase.hregion.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion"); - pv_config.setInt("hbase.client.retries.number", 3); - - peer_configs = new HashMap<Integer, Configuration>(); - peer_connections = new HashMap<Integer, HConnection>(); - - sv_dc_zk = new HBaseDCZK(pv_config); - peer_info_list = sv_dc_zk.list_clusters(); - sv_my_cluster_id = sv_dc_zk.get_my_id(); - if (sv_my_cluster_id == null) { - sv_my_cluster_id = "0"; - } - - if (LOG.isTraceEnabled()) LOG.trace("My cluster id: " + sv_my_cluster_id); - pv_config.setInt("esgyn.cluster.id", Integer.parseInt(sv_my_cluster_id)); - - } - - public static void initClusterConfigsZK(Configuration pv_config) - throws InterruptedException, KeeperException, IOException - { - if (LOG.isTraceEnabled()) LOG.trace("initClusterConfigsZK ENTRY"); - - initObjects(pv_config); - - // Put myself in the list of configurations - add_peer(pv_config, - 0); - - try { - - if (peer_info_list == null) { - if (LOG.isTraceEnabled()) LOG.trace("initClusterConfigsZK: list_clusters returned null"); - return; - } - - for (PeerInfo lv_pi : peer_info_list.values()) { - if (LOG.isTraceEnabled()) LOG.trace("initClusterConfigsZK: " + lv_pi); - - if (lv_pi.get_id().equals(sv_my_cluster_id)) { - continue; - } - - add_peer(pv_config, - lv_pi.get_id(), - lv_pi.get_quorum(), - lv_pi.get_port()); - - sv_peer_count++; - } - } - catch (Exception e) { - LOG.error("Exception while adding peer info to the config: " + e); - } - - } - - public PeerInfo getPeerInfo(int pv_cluster_id) { - PeerInfo lv_pi = peer_info_list.get(pv_cluster_id); - - return lv_pi; - } - - public synchronized void setPeerStatus(int pv_cluster_id, - String pv_status) - { - - if (LOG.isTraceEnabled()) LOG.trace("setPeerStatus" - + " cluster id: " + pv_cluster_id - + " status: " + pv_status - ); - - if (pv_status == null) { - return; - } - - PeerInfo lv_pi = getPeerInfo(pv_cluster_id); - if (lv_pi != null) { - boolean previouslySTRUp = lv_pi.isSTRUp(); - lv_pi.set_status(pv_status); - boolean nowSTRUp = lv_pi.isSTRUp(); - if (previouslySTRUp && ! nowSTRUp) { - --sv_peer_count; - } - else if (! previouslySTRUp && nowSTRUp) { - ++sv_peer_count; - } - } - - if (LOG.isTraceEnabled()) LOG.trace("setPeerStatus" - + " peer count: " + sv_peer_count - ); - return; - } - - - public String getPeerStatus(int pv_cluster_id) - { - - if (LOG.isTraceEnabled()) LOG.trace("getPeerStatus" - + " cluster id: " + pv_cluster_id - ); - - PeerInfo lv_pi = getPeerInfo(pv_cluster_id); - if (lv_pi != null) { - return lv_pi.get_status(); - } - return ""; - } - public int getPeerCount() - { - return sv_peer_count; - } - - public Configuration getPeerConfiguration(int pv_cluster_id, boolean pv_STR_should_be_up) - { - if (pv_STR_should_be_up) { - PeerInfo lv_pi = getPeerInfo(pv_cluster_id); - if (lv_pi == null) { - return null; - } - if (! lv_pi.isSTRUp()) { - return null; - } - } - - return peer_configs.get(pv_cluster_id); - } - - public Configuration getPeerConfiguration(int pv_cluster_id) - { - boolean lv_STR_should_be_up_flag; - - lv_STR_should_be_up_flag = true; - if (pv_cluster_id == 0) { - lv_STR_should_be_up_flag = false; - } - - return getPeerConfiguration(pv_cluster_id, lv_STR_should_be_up_flag); - } - - public Map<Integer, Configuration> getPeerConfigurations() - { - return peer_configs; - } - - public HConnection getPeerConnection(int pv_peer_id) - { - return peer_connections.get(pv_peer_id); - } - - public Map<Integer, HConnection> getPeerConnections() - { - return peer_connections; - } - - public Map<Integer, PeerInfo> getPeerInfos() - { - return peer_info_list; - } - - public String getMyClusterId() - { - return sv_my_cluster_id; - } - - public int getMyClusterIdInt() - { - return Integer.parseInt(sv_my_cluster_id); - } - - public static void setTrafodionNodeCount() - { - String lv_trafodion_node_count_string = System.getenv("TRAFODION_NODE_COUNT"); - if (lv_trafodion_node_count_string != null) { - sv_trafodion_node_count = Integer.parseInt(lv_trafodion_node_count_string); - } - else { - sv_trafodion_node_count = 0; - } - LOG.info("TRAFODION_NODE_COUNT = " + sv_trafodion_node_count); - } - - public int getTrafodionNodeCount() - { - return sv_trafodion_node_count; - } - - // getInstance to return the singleton object for TransactionManager - public synchronized static STRConfig getInstance(final Configuration conf) - throws IOException, InterruptedException, KeeperException, ZooKeeperConnectionException - { - if (s_STRConfig == null) { - - s_STRConfig = new STRConfig(conf); - } - return s_STRConfig; - } - - /** - * @param conf - * @throws ZooKeeperConnectionException - */ - private STRConfig(final Configuration conf) - throws InterruptedException, KeeperException, ZooKeeperConnectionException, IOException - - { - setTrafodionNodeCount(); - - initClusterConfigsZK(conf); - - if (sv_dc_zk != null) { - sv_dc_zk.watch_all(); - XDCStatusWatcher lv_pw = new XDCStatusWatcher(sv_dc_zk.getZKW()); - lv_pw.setDCZK(sv_dc_zk); - lv_pw.setSTRConfig(this); - sv_dc_zk.register_status_listener(lv_pw); - } - } - - public String toString() - { - StringBuilder lv_sb = new StringBuilder(); - String lv_str; - lv_str = "Number of peers: " + sv_peer_count; - lv_sb.append(lv_str); - for ( Map.Entry<Integer, Configuration> e : peer_configs.entrySet() ) { - lv_str = "\n======\nID: " + e.getKey() + "\n"; - lv_sb.append(lv_str); - lv_str = ZK_QUORUM + ": " + e.getValue().get(ZK_QUORUM); - lv_sb.append(lv_str); - lv_str = ZK_PORT + ": " + e.getValue().get(ZK_PORT); - lv_sb.append(lv_str); - } - - return lv_sb.toString(); - } - - public static void main(String[] args) { - STRConfig pSTRConfig = null; - Configuration lv_config = HBaseConfiguration.create(); - try { - pSTRConfig = STRConfig.getInstance(lv_config); - } - catch (InterruptedException int_exception) { - System.out.println("Interrupted Exception trying to get STRConfig instance: " + int_exception); - } - catch (IOException ioe) { - System.out.println("IO Exception trying to get STRConfig instance: " + ioe); - } - catch (KeeperException kpe) { - System.out.println("Keeper Exception trying to get STRConfig instance: " + kpe); - } - - System.out.println(pSTRConfig); - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/XDCStatusWatcher.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/XDCStatusWatcher.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/XDCStatusWatcher.java deleted file mode 100644 index c34e507..0000000 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/XDCStatusWatcher.java +++ /dev/null @@ -1,207 +0,0 @@ -// @@@ START COPYRIGHT @@@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// @@@ END COPYRIGHT @@@ - -package org.apache.hadoop.hbase.client.transactional; - -import java.io.IOException; - -import java.nio.charset.Charset; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Scanner; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; - -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; - -import org.apache.hadoop.hbase.client.transactional.STRConfig; - -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; - -import org.apache.hadoop.hbase.client.transactional.PeerInfo; -import org.apache.hadoop.hbase.client.transactional.HBaseDCZK; - -/** - * Watcher used to follow the creation and deletion of peer clusters. - */ -public class XDCStatusWatcher extends ZooKeeperListener { - - static final Log LOG = LogFactory.getLog(XDCStatusWatcher.class); - - private HBaseDCZK m_zk; - private STRConfig m_str; - - /** - * Construct a ZooKeeper event listener. - */ - public XDCStatusWatcher(ZooKeeperWatcher pv_watcher) - { - super(pv_watcher); - } - - public void setDCZK(HBaseDCZK pv_zk) - { - m_zk = pv_zk; - } - - public void setSTRConfig(STRConfig pv_str) - { - m_str = pv_str; - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - public void nodeDeleted(String path) { - if (LOG.isInfoEnabled()) LOG.info(path + " znode deleted"); - try { - if (m_zk != null) m_zk.watch_all(); - } - catch (Exception e) - { - LOG.error("Exception raised by watch_all: " + e); - } - } - - public PeerInfo getPeerInfo(String path) - { - if (LOG.isTraceEnabled()) LOG.trace("getPeerInfo: " - + path - ); - if ( ! path.startsWith(HBaseDCZK.m_clusters_node)) { - return null; - } - - String[] lv_elements = path.split("/"); - int lv_length = lv_elements.length; - if (LOG.isTraceEnabled()) LOG.trace("Number of elements:" + lv_length); - - if (lv_length <= 1) { - return null; - } - - int lv_id_index = lv_length - 2; - String lv_id = lv_elements[lv_id_index]; - if (LOG.isTraceEnabled()) LOG.trace("Node element[" + lv_id_index + "]: " + lv_id); - try { - PeerInfo lv_pi = m_zk.get_peer_znode(lv_id); - if (lv_pi != null) { - if (LOG.isInfoEnabled()) LOG.info("Peer info: " + lv_pi); - return lv_pi; - } - } - catch (Exception e) - { - LOG.error("Exception raised by get_peer_znode: " + e); - } - - return null; - } - - void updateSTRConfig(PeerInfo pv_pi) - { - if (LOG.isInfoEnabled()) LOG.info("updateSTRConfig: PeerInfo: " - + ((pv_pi != null) ? pv_pi :"") - ); - if (m_str == null) { - if (LOG.isInfoEnabled()) LOG.info("updateSTRConfig: STRConfig is null"); - return; - } - - if (pv_pi == null) { - return; - } - - String lv_id = pv_pi.get_id(); - if (lv_id == null) { - return; - } - - int lv_integer_id = Integer.parseInt(lv_id); - m_str.setPeerStatus(lv_integer_id, - pv_pi.get_status()); - - } - - /** - * Called when an existing node has changed data. - * @param path full path of the updated node - */ - public void nodeDataChanged(String path) { - - LOG.info(path + " znode changed"); - - try { - if (m_zk != null) m_zk.watch_all(); - } - catch (Exception e) - { - LOG.error("Exception raised by watch_all: " + e); - } - - if (m_zk != null) { - PeerInfo lv_pi = getPeerInfo(path); - if (lv_pi == null) { - return; - } - - updateSTRConfig(lv_pi); - - } - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - public void nodeChildrenChanged(String path) { - - try { - if (m_zk != null) m_zk.watch_all(); - } - catch (Exception e) - { - LOG.error("Exception raised by watch_all: " + e); - } - - if (LOG.isInfoEnabled()) LOG.info(path + " znode children changed"); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl index 476f761..9c14965 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl @@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.client.transactional.OutOfOrderProtocolException; import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; import org.apache.hadoop.hbase.client.transactional.BatchException; import org.apache.hadoop.hbase.client.transactional.TransState; -import org.apache.hadoop.hbase.client.transactional.STRConfig; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -419,7 +418,6 @@ CoprocessorService, Coprocessor { public static final String trxkeyEPCPinstance = "EPCPinstance"; // TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict static ConcurrentHashMap<String, Object> transactionsEPCPMap; - STRConfig pSTRConfig; // TrxRegionService methods @Override @@ -2929,12 +2927,6 @@ CoprocessorService, Coprocessor { org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse.Builder recoveryResponseBuilder = RecoveryRequestResponse.newBuilder(); - try { - pSTRConfig = STRConfig.getInstance(config); - } catch (Exception xe) { - LOG.error("An ERROR occurred while getting the STR Configuration"); - } - List<Long> indoubtTransactions = new ArrayList<Long>(); if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: recoveryRequest Trafodion Recovery: region " + regionInfo.getEncodedName() + " receives recovery request from TM " + tmId + " with region state " + regionState); switch(regionState) { @@ -2943,16 +2935,9 @@ CoprocessorService, Coprocessor { if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV:recoveryRequest in region starting" + regionInfo.getEncodedName() + " has in-doubt transaction " + indoubtTransactionsById.size()); for (Entry<Long, List<WALEdit>> entry : indoubtTransactionsById.entrySet()) { long tid = entry.getKey(); - int clusterid = (int) TransactionState.getClusterId(tid); - int nodeid = (int) TransactionState.getNodeId(tid); - - boolean add = false; - if ((clusterid == 0 || (clusterid == pSTRConfig.getMyClusterIdInt())) && nodeid == tmId) add = true; // match local TM - else if (((clusterid != pSTRConfig.getMyClusterIdInt()) && clusterid != 0) && tmId == -2) add = true; // for any peer - - if (add) { + if ((int) (tid >> 32) == tmId) { indoubtTransactions.add(tid); - if (LOG.isInfoEnabled()) LOG.info("TrxRegionEndpoint coprocessor: recoveryRequest - txId " + transactionId + ", Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction " + tid + " has been added into the recovery reply to Cluster " + clusterid + " Node " + nodeid + " TM " + tmId + " during recovery "); + if (LOG.isInfoEnabled()) LOG.info("TrxRegionEndpoint coprocessor: recoveryRequest - txId " + transactionId + ", Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction " + tid + " has been added into the recovery reply to TM " + tmId + " during recovery "); } } if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: recoveryRequest " + indoubtTransactions.size()); @@ -2971,22 +2956,10 @@ CoprocessorService, Coprocessor { if (LOG.isInfoEnabled()) LOG.info("TRAF RCOV:recoveryRequest in region started" + regionInfo.getEncodedName() + " has in-doubt transaction " + commitPendingCopy.size()); for (TrxTransactionState commitPendingTS : commitPendingCopy) { long tid = commitPendingTS.getTransactionId(); - int clusterid = (int) TransactionState.getClusterId(tid); - int nodeid = (int) TransactionState.getNodeId(tid); - - boolean add = false; - if (((clusterid == pSTRConfig.getMyClusterIdInt()) || clusterid == 0) && nodeid == tmId) add = true; // match local TM - if (((clusterid != pSTRConfig.getMyClusterIdInt()) && clusterid != 0) && tmId == -2) add = true; // for any peer - - if (add) { + if ((int) (tid >> 32) == tmId) { indoubtTransactions.add(tid); - if (LOG.isInfoEnabled()) { - LOG.info("TrxRegionEndpoint coprocessor: recoveryRequest - Trafodion Recovery: region " - + regionInfo.getEncodedName() + " in-doubt transaction " + tid - + " has been added into the recovery reply to Cluster " + clusterid + " Node " - + nodeid + " TM " + tmId + " during start "); - } - } + if (LOG.isInfoEnabled()) LOG.info("TrxRegionEndpoint coprocessor: recoveryRequest - Trafodion Recovery: region " + regionInfo.getEncodedName() + " in-doubt transaction " + tid + " has been added into the recovery reply to TM " + tmId + " during start "); + } } // now remove the ZK node after TM has initiated the ecovery request String lv_encoded = m_Region.getRegionInfo().getEncodedName(); @@ -3522,12 +3495,6 @@ CoprocessorService, Coprocessor { this.config = tmp_env.getConfiguration(); - try { - pSTRConfig = STRConfig.getInstance(config); - } catch (Exception xe) { - LOG.error("An ERROR occurred while getting the STR Configuration"); - } - synchronized (stoppableLock) { try { this.transactionLeaseTimeout = config.getInt(LEASE_CONF, MINIMUM_LEASE_TIME); @@ -3925,7 +3892,7 @@ CoprocessorService, Coprocessor { List<Integer> staleBranchforTMId = new ArrayList<Integer>(); List<TrxTransactionState> commitPendingCopy = new ArrayList<TrxTransactionState>(commitPendingTransactions); Map<Long, List<WALEdit>> indoubtTransactionsMap = new TreeMap<Long, List<WALEdit>>(indoubtTransactionsById); - int tmid, clusterid, tm; + int tmid, tm; // selected printout for CP long currentEpoch = controlPointEpoch.get(); @@ -3943,11 +3910,7 @@ CoprocessorService, Coprocessor { transactionId = entry.getKey(); if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: indoubt branch Txn id " + transactionId + " region info bytes " + new String(lv_byte_region_info)); - clusterid = (int) TransactionState.getClusterId(transactionId); - tmid = (int) TransactionState.getNodeId(transactionId); - LOG.info("Traf Reco Thread detect stale branch tid " + transactionId + " cluster id " + clusterid + " node " + tmid + " PSTRConfig " + pSTRConfig.getMyClusterId()); - if ((clusterid != pSTRConfig.getMyClusterIdInt()) && clusterid != 0) tmid = -2; // for any peer - + tmid = (int) (transactionId >> 32); if (!staleBranchforTMId.contains(tmid)) {staleBranchforTMId.add(tmid);} } } @@ -3957,11 +3920,7 @@ CoprocessorService, Coprocessor { transactionId = commitPendingTS.getTransactionId(); if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:choreThreadDetectStaleTransactionBranch: stale branch Txn id " + transactionId + " region info bytes " + new String(lv_byte_region_info)); - clusterid = (int) TransactionState.getClusterId(transactionId); - tmid = (int) TransactionState.getNodeId(transactionId); - LOG.info("Traf Reco Thread detect stale branch tid " + transactionId + " cluster id " + clusterid + " node " + tmid + " PSTRConfig " + pSTRConfig.getMyClusterId()); - if ((clusterid != pSTRConfig.getMyClusterIdInt()) && clusterid != 0) tmid = -2; // for any peer - + tmid = (int) (transactionId >> 32); if (!staleBranchforTMId.contains(tmid)) {staleBranchforTMId.add(tmid);} } } @@ -4336,9 +4295,7 @@ CoprocessorService, Coprocessor { if (state.isReinstated()) { synchronized(indoubtTransactionsById) { indoubtTransactionsById.remove(state.getTransactionId()); - int clusterid = (int) TransactionState.getClusterId(transactionId); - int tmid = (int) TransactionState.getNodeId(transactionId); - if ((clusterid != pSTRConfig.getMyClusterIdInt()) && (clusterid != 0)) tmid = -2; // for any peer + int tmid = (int) (transactionId >> 32); int count = 0; if (indoubtTransactionsCountByTmid.containsKey(tmid)) { count = (int) indoubtTransactionsCountByTmid.get(tmid) - 1; @@ -4891,9 +4848,8 @@ CoprocessorService, Coprocessor { //throw exp; } if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP: reconstruct transaction: rewrite to HLOG CR edit for transaction " + transactionId); - int clusterid = (int) TransactionState.getClusterId(transactionId); - int tmid = (int) TransactionState.getNodeId(transactionId); - if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP " + regionInfo.getRegionNameAsString() + " reconstruct transaction " + transactionId + " for Cluster " + clusterid + " TM " + tmid); + int tmid = (int) (transactionId >> 32); + if (LOG.isTraceEnabled()) LOG.trace("TrxRegion endpoint CP " + regionInfo.getRegionNameAsString() + " reconstruct transaction " + transactionId + " for TM " + tmid); } // for all txns in indoubt transcation list } // not reconstruct indoubtes yet reconstructIndoubts = 1; @@ -5492,9 +5448,7 @@ CoprocessorService, Coprocessor { synchronized(indoubtTransactionsById) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: Trafodion Recovery: abort reinstated indoubt transactions " + transactionId); indoubtTransactionsById.remove(state.getTransactionId()); - int clusterid = (int) TransactionState.getClusterId(transactionId); - int tmid = (int) TransactionState.getNodeId(transactionId); - if ((clusterid != pSTRConfig.getMyClusterIdInt()) && (clusterid != 0)) tmid = -2; // for any peer + int tmid = (int) (transactionId >> 32); int count = 0; // indoubtTransactionsCountByTmid protected by http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl index aa27db4..7de9391 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder; import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState; import org.apache.hadoop.hbase.regionserver.transactional.TransactionState; -import org.apache.hadoop.hbase.client.transactional.STRConfig; import org.apache.hadoop.hbase.wal.WAL; //import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -178,7 +177,6 @@ private static ZooKeeperWatcher zkw1 = null; private static Object zkRecoveryCheckLock = new Object(); SplitBalanceHelper sbHelper; -STRConfig pSTRConfig; // Region Observer Coprocessor START @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -538,35 +536,20 @@ public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { //for each indoubt transaction from pendingTransactionsById, build related transaction state object and add it into required lists for endPoint //build a list of TMs for in-doubt transactions, -2 is used for all peer's transactions - try { - pSTRConfig = STRConfig.getInstance(my_config); - } catch (Exception xe) { - LOG.error("An ERROR occurred while getting the STR Configuration"); - } for (Entry<Long, List<WALEdit>> entry : pendingTransactionsById.entrySet()) { synchronized (recoveryCheckLock) { long transactionId = entry.getKey(); String key = String.valueOf(transactionId); if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " process in-doubt transaction " + transactionId); - - int clusterid = (int) TransactionState.getClusterId(transactionId); - int tmid = (int) TransactionState.getNodeId(transactionId); + int tmid = (int) (transactionId >> 32); int count = 1; - if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " add prepared " + transactionId + " to TM " + tmid); - if ((clusterid == 0) || (clusterid == pSTRConfig.getMyClusterIdInt())) { - if ((m_isTrafodionMetadata) || LOG.isTraceEnabled()) LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " add local prepared " + transactionId); - } - else { - tmid = -2; // peer transactions is always sent to -1 ZK, which LDTM peer recovery thread will pick up - if ((m_isTrafodionMetadata) || (LOG.isTraceEnabled())) - LOG.info("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() - + " add peer prepared " + transactionId); - } + if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " add prepared " + transactionId + " to TM " + tmid); if (indoubtTransactionsCountByTmid.containsKey(tmid)) count = (int) indoubtTransactionsCountByTmid.get(tmid) + 1; indoubtTransactionsCountByTmid.put(tmid, count); - if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " has " + count + " in-doubt-transaction from TM " + tmid); + if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: Region " + regionInfo.getRegionNameAsString() + " has " + count + + " in-doubt-transaction from TM " + tmid); //TBD may need to write the LOG again for reinstated txn (redo does not generate edits) //since after open, HBase may toss out split-log while there are indoubt list in memory http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sql/pom.xml.apache ---------------------------------------------------------------------- diff --git a/core/sql/pom.xml.apache b/core/sql/pom.xml.apache index c3f9907..6500b66 100644 --- a/core/sql/pom.xml.apache +++ b/core/sql/pom.xml.apache @@ -39,8 +39,21 @@ <build> <defaultGoal>package</defaultGoal> - <outputDirectory>${project.build.directory}/classes_${hbase-flavor-id}</outputDirectory> - <finalName>${project.artifactId}-${project.version}</finalName> + <outputDirectory>${project.build.directory}/classes_${hbase-flavor-id}</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestFile> + src/main/resources/trafodion-sql.jar.mf + </manifestFile> + </archive> + </configuration> + </plugin> + </plugins> </build> <dependencies> @@ -112,7 +125,7 @@ <modelVersion>4.0.0</modelVersion> <artifactId>trafodion-sql-apache1_0_2</artifactId> <version>${env.TRAFODION_VER}</version> - <name>trafodion-sql</name> + <name>trafodion-sql-apache1_0_2</name> <description>Java code for SQL engine in Trafodion</description> </project> http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/28460bd8/core/sql/pom.xml.hdp ---------------------------------------------------------------------- diff --git a/core/sql/pom.xml.hdp b/core/sql/pom.xml.hdp index b7e78cf..8a28586 100644 --- a/core/sql/pom.xml.hdp +++ b/core/sql/pom.xml.hdp @@ -39,10 +39,22 @@ <build> <defaultGoal>package</defaultGoal> - <outputDirectory>${project.build.directory}/classes_${hbase-flavor-id}</outputDirectory> - <finalName>${project.artifactId}-${project.version}</finalName> - </build> - + <outputDirectory>${project.build.directory}/classes_${hbase-flavor-id}</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestFile> + src/main/resources/trafodion-sql.jar.mf + </manifestFile> + </archive> + </configuration> + </plugin> + </plugins> + </build> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> @@ -95,5 +107,4 @@ <name>trafodion-sql</name> <description>Java code for SQL engine in Trafodion</description> - </project>
