Repository: phoenix
Updated Branches:
  refs/heads/master ea0a502ce -> 3b30690f6


PHOENIX-1107 Support mutable indexes over replication

Adding test to ensure that we still have indexes working over replication, 
rather than
just relying on the fact that it 'just works'.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3b30690f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3b30690f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3b30690f

Branch: refs/heads/master
Commit: 3b30690f6fdb11cc6c5d4bf53ee709b6e97196f7
Parents: ea0a502
Author: Jesse Yates <jya...@apache.org>
Authored: Mon Oct 6 11:50:47 2014 -0700
Committer: Jesse Yates <jya...@apache.org>
Committed: Mon Oct 6 11:53:23 2014 -0700

----------------------------------------------------------------------
 .../index/MutableIndexReplicationIT.java        | 280 +++++++++++++++++++
 1 file changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b30690f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
new file mode 100644
index 0000000..9981ed8
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that we correctly replicate indexes over replication
+ * <p>
+ * Code for setUp/teardown copied from 
org.apache.hadoop.hbase.replication.TestReplicationBase in
+ * HBase 0.98.5
+ * </p>
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class MutableIndexReplicationIT extends BaseTest {
+
+    private static final Log LOG = 
LogFactory.getLog(MutableIndexReplicationIT.class);
+
+    public static final String SCHEMA_NAME = "";
+    public static final String DATA_TABLE_NAME = "T";
+    public static final String INDEX_TABLE_NAME = "I";
+    public static final String DATA_TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME, "T");
+    public static final String INDEX_TABLE_FULL_NAME = 
SchemaUtil.getTableName(SCHEMA_NAME, "I");
+    private static final long REPLICATION_WAIT_TIME_MILLIS = 10000;
+
+    protected static PhoenixTestDriver driver;
+    private static String URL;
+
+    protected static Configuration conf1 = HBaseConfiguration.create();
+    protected static Configuration conf2;
+
+    protected static ZooKeeperWatcher zkw1;
+    protected static ZooKeeperWatcher zkw2;
+
+    protected static ReplicationAdmin admin;
+
+    protected static HBaseTestingUtility utility1;
+    protected static HBaseTestingUtility utility2;
+    protected static final int REPLICATION_RETRIES = 100;
+
+    protected static final byte[] tableName = Bytes.toBytes("test");
+    protected static final byte[] row = Bytes.toBytes("row");
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        setupConfigsAndStartCluster();
+        setupDriver();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        utility2.shutdownMiniCluster();
+        utility1.shutdownMiniCluster();
+    }
+
+    private static void setupConfigsAndStartCluster() throws Exception {
+        // cluster-1 lives at regular HBase home, so we don't need to change 
how phoenix handles
+        // lookups
+//        conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+        // smaller log roll size to trigger more events
+        conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+        conf1.setInt("replication.source.size.capacity", 10240);
+        conf1.setLong("replication.source.sleepforretries", 100);
+        conf1.setInt("hbase.regionserver.maxlogs", 10);
+        conf1.setLong("hbase.master.logcleaner.ttl", 10);
+        conf1.setInt("zookeeper.recovery.retry", 1);
+        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
+        conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, 
HConstants.REPLICATION_ENABLE_DEFAULT);
+        conf1.setBoolean("dfs.support.append", true);
+        conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+        conf1.setInt("replication.stats.thread.period.seconds", 5);
+        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+        // add the phoenix-specific configs
+        BaseTest.setUpConfigForMiniCluster(conf1);
+
+        utility1 = new HBaseTestingUtility(conf1);
+        utility1.startMiniZKCluster();
+        MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+        // Have to reset conf1 in case zk cluster location different
+        // than default
+        conf1 = utility1.getConfiguration();
+        zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
+        admin = new ReplicationAdmin(conf1);
+        LOG.info("Setup first Zk");
+
+        // Base conf2 on conf1 so it gets the right zk cluster, and general 
cluster configs
+        conf2 = HBaseConfiguration.create(conf1);
+        conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+        conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+        conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, 
HConstants.REPLICATION_ENABLE_DEFAULT);
+        conf2.setBoolean("dfs.support.append", true);
+        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+        utility2 = new HBaseTestingUtility(conf2);
+        utility2.setZkCluster(miniZK);
+        zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
+
+        //replicate from cluster 1 -> cluster 2, but not back again
+        admin.addPeer("1", utility2.getClusterKey());
+
+        LOG.info("Setup second Zk");
+        utility1.startMiniCluster(2);
+        utility2.startMiniCluster(2);
+    }
+
+    private static void setupDriver() throws Exception {
+        LOG.info("Setting up phoenix driver");
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
+        // Forces server cache to be used
+        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, 
Integer.toString(2));
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        // Must update config before starting server
+        URL = getLocalClusterUrl(utility1);
+        LOG.info("Connecting driver to "+URL);
+        setUpTestDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    protected static void setUpTestDriver(String url, ReadOnlyProps props) 
throws Exception {
+        if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+            if (driver == null) {
+                driver = initAndRegisterDriver(url, props);
+            }
+        }
+    }
+
+    @Test
+    public void testReplicationWithMutableIndexes() throws Exception {
+        Connection conn = getConnection();
+
+        //create the primary and index tables
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME
+                        + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR)");
+        conn.createStatement().execute(
+                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_FULL_NAME
+                        + " (v1)");
+
+        // make sure that the tables are empty, but reachable
+        String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        //make sure there is no data in the table
+        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertFalse(rs.next());
+
+        // make sure the data tables are created on the remote cluster
+        HBaseAdmin admin = utility1.getHBaseAdmin();
+        HBaseAdmin admin2 = utility2.getHBaseAdmin();
+
+        List<String> dataTables = new ArrayList<String>();
+        dataTables.add(DATA_TABLE_FULL_NAME);
+        dataTables.add(INDEX_TABLE_FULL_NAME);
+        for (String tableName : dataTables) {
+            HTableDescriptor desc = 
admin.getTableDescriptor(TableName.valueOf(tableName));
+
+            //create it as-is on the remote cluster
+            admin2.createTable(desc);
+
+            LOG.info("Enabling replication on source table: "+tableName);
+            HColumnDescriptor[] cols = desc.getColumnFamilies();
+            assertEquals(1, cols.length);
+            // add the replication scope to the column
+            HColumnDescriptor col = desc.removeFamily(cols[0].getName());
+            col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+            desc.addFamily(col);
+            //disable/modify/enable table so it has replication enabled
+            admin.disableTable(desc.getTableName());
+            admin.modifyTable(tableName, desc);
+            admin.enableTable(desc.getTableName());
+            LOG.info("Replication enabled on source table: "+tableName);
+        }
+
+
+        // load some data into the source cluster table
+        PreparedStatement stmt =
+                conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + 
" VALUES(?,?,?)");
+        stmt.setString(1, "a"); // k
+        stmt.setString(2, "x"); // v1 <- has index
+        stmt.setString(3, "1"); // v2
+        stmt.execute();
+        conn.commit();
+
+        // make sure the index is working as expected
+        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+        rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals("x", rs.getString(1));
+        assertFalse(rs.next());
+        conn.close();
+
+        /*
+         Validate that we have replicated the rows to the remote cluster
+        */
+
+        // other table can't be reached through Phoenix right now - would need 
to change how we
+        // lookup tables. For right now, we just go through an HTable
+        LOG.info("Looking up tables in replication target");
+        TableName[] tables = admin2.listTableNames();
+        HTable remoteTable = new HTable(utility2.getConfiguration(), 
tables[0]);
+        for (int i = 0; i < REPLICATION_RETRIES; i++) {
+            if (i >= REPLICATION_RETRIES - 1) {
+                fail("Waited too much time for put replication on table " + 
remoteTable
+                        .getTableDescriptor().getNameAsString());
+            }
+            if (ensureAnyRows(remoteTable)) {
+                break;
+            }
+            LOG.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+                    + " for edits to get replicated");
+            Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
+        }
+        remoteTable.close();
+    }
+
+    private boolean ensureAnyRows(HTable remoteTable) throws IOException {
+        Scan scan = new Scan();
+        scan.setRaw(true);
+        ResultScanner scanner = remoteTable.getScanner(scan);
+        boolean found = false;
+        for (Result r : scanner) {
+            LOG.info("got row: " + r);
+            found = true;
+        }
+        scanner.close();
+        return found;
+    }
+
+    private static Connection getConnection() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        return DriverManager.getConnection(URL, props);
+    }
+}
\ No newline at end of file

Reply via email to