Repository: phoenix
Updated Branches:
  refs/heads/4.0 2d1e512bb -> 463dabdcb


PHOENIX-938: Use higher priority queue for index updates to prevent deadlock


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/463dabdc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/463dabdc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/463dabdc

Branch: refs/heads/4.0
Commit: 463dabdcb9e3af7cc9182556c8e56fce0caca0de
Parents: 2d1e512
Author: Jesse Yates <jya...@apache.org>
Authored: Wed Jul 23 11:10:52 2014 -0700
Committer: Jesse Yates <jya...@apache.org>
Committed: Wed Jul 23 11:11:23 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/IndexHandlerIT.java   | 166 +++++++++++++++++++
 .../hbase/ipc/PhoenixIndexRpcScheduler.java     | 125 ++++++++++++++
 .../phoenix/hbase/index/IndexQosCompat.java     |  98 +++++++++++
 .../index/IndexQosRpcControllerFactory.java     |  86 ++++++++++
 .../hbase/index/builder/IndexBuildManager.java  |   1 -
 .../hbase/index/builder/IndexBuilder.java       |   1 -
 .../ipc/PhoenixIndexRpcSchedulerFactory.java    | 104 ++++++++++++
 .../index/table/CoprocessorHTableFactory.java   |  65 ++++----
 .../java/org/apache/phoenix/util/IndexUtil.java |   6 +
 .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java |  99 +++++++++++
 .../PhoenixIndexRpcSchedulerFactoryTest.java    | 105 ++++++++++++
 pom.xml                                         |   4 +-
 12 files changed, 827 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
new file mode 100644
index 0000000..a829ae1
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory;
+import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Comprehensive test that ensures we are adding custom index handlers
+ */
+public class IndexHandlerIT {
+
+    public static class CountingIndexClientRpcFactory extends 
RpcControllerFactory {
+
+        private IndexQosRpcControllerFactory delegate;
+
+        public CountingIndexClientRpcFactory(Configuration conf) {
+            super(conf);
+            this.delegate = new IndexQosRpcControllerFactory(conf);
+        }
+
+        @Override
+        public PayloadCarryingRpcController newController() {
+            PayloadCarryingRpcController controller = delegate.newController();
+            return new CountingIndexClientRpcController(controller);
+        }
+
+        @Override
+        public PayloadCarryingRpcController newController(CellScanner 
cellScanner) {
+            PayloadCarryingRpcController controller = 
delegate.newController(cellScanner);
+            return new CountingIndexClientRpcController(controller);
+        }
+
+        @Override
+        public PayloadCarryingRpcController newController(List<CellScannable> 
cellIterables) {
+            PayloadCarryingRpcController controller = 
delegate.newController(cellIterables);
+            return new CountingIndexClientRpcController(controller);
+        }
+    }
+
+    public static class CountingIndexClientRpcController extends
+            DelegatingPayloadCarryingRpcController {
+
+        private static Map<Integer, Integer> priorityCounts = new 
HashMap<Integer, Integer>();
+
+        public CountingIndexClientRpcController(PayloadCarryingRpcController 
delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public void setPriority(int pri) {
+            Integer count = priorityCounts.get(pri);
+            if (count == 0) {
+                count = new Integer(0);
+            }
+            count = count.intValue() + 1;
+            priorityCounts.put(pri, count);
+
+        }
+    }
+
+    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+    private static final byte[] row = Bytes.toBytes("row");
+    private static final byte[] family = Bytes.toBytes("FAM");
+    private static final byte[] qual = Bytes.toBytes("qual");
+    private static final HColumnDescriptor FAM1 = new 
HColumnDescriptor(family);
+
+    @Rule
+    public TableName TestTable = new TableName();
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        UTIL.startMiniCluster();
+    }
+
+    @AfterClass
+    public static void shutdownCluster() throws Exception {
+        UTIL.shutdownMiniCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        HTableDescriptor desc =
+                new 
HTableDescriptor(org.apache.hadoop.hbase.TableName.valueOf(TestTable
+                        .getTableNameString()));
+        desc.addFamily(FAM1);
+
+        // create the table
+        HBaseAdmin admin = UTIL.getHBaseAdmin();
+        admin.createTable(desc);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        HBaseAdmin admin = UTIL.getHBaseAdmin();
+        admin.disableTable(TestTable.getTableName());
+        admin.deleteTable(TestTable.getTableName());
+    }
+
+    @Test
+    public void testClientWritesWithPriority() throws Exception {
+        Configuration conf = new Configuration(UTIL.getConfiguration());
+        // add the keys for our rpc factory
+        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+            CountingIndexClientRpcFactory.class.getName());
+        // and set the index table as the current table
+        conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
+            TestTable.getTableNameString());
+        HTable table = new HTable(conf, TestTable.getTableName());
+
+        // do a write to the table
+        Put p = new Put(row);
+        p.add(family, qual, new byte[] { 1, 0, 1, 0 });
+        table.put(p);
+        table.flushCommits();
+
+        // check the counts on the rpc controller
+        assertEquals("Didn't get the expected number of index priority 
writes!", (int) 1,
+            (int) CountingIndexClientRpcController.priorityCounts
+                    
.get(PhoenixIndexRpcSchedulerFactory.DEFAULT_INDEX_MIN_PRIORITY));
+
+        table.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
new file mode 100644
index 0000000..8349321
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.RpcExecutor;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link RpcScheduler} that first checks to see if this is an index update 
before passing off the
+ * call to the delegate {@link RpcScheduler}.
+ * <p>
+ * We reserve the range (1000, 1050], by default (though it is configurable), 
for index priority
+ * writes. Currently, we don't do any prioritization within that range - all 
index writes are
+ * treated with the same priority and put into the same queue.
+ */
+public class PhoenixIndexRpcScheduler extends RpcScheduler {
+
+    // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 
0.98.4
+    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = 
"ipc.server.callqueue.read.share";
+    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
+            "ipc.server.callqueue.handler.factor";
+    private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+
+    private RpcScheduler delegate;
+    private int minPriority;
+    private int maxPriority;
+    private RpcExecutor callExecutor;
+
+    public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf,
+            RpcScheduler delegate, int minPriority, int maxPriority) {
+        int maxQueueLength =
+                conf.getInt("ipc.server.max.callqueue.length", 
indexHandlerCount
+                        * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+
+        // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 
0.98.4
+        float callQueuesHandlersFactor = 
conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
+        int numCallQueues =
+                Math.max(1, (int) Math.round(indexHandlerCount * 
callQueuesHandlersFactor));
+
+        this.minPriority = minPriority;
+        this.maxPriority = maxPriority;
+        this.delegate = delegate;
+
+        this.callExecutor =
+                new BalancedQueueRpcExecutor("Index", indexHandlerCount, 
numCallQueues,
+                        maxQueueLength);
+    }
+
+    @Override
+    public void init(Context context) {
+        delegate.init(context);
+    }
+
+    @Override
+    public void start() {
+        delegate.start();
+    }
+
+    @Override
+    public void stop() {
+        delegate.stop();
+        callExecutor.stop();
+    }
+
+    @Override
+    public void dispatch(CallRunner callTask) throws InterruptedException, 
IOException {
+        RpcServer.Call call = callTask.getCall();
+        int priority = call.header.getPriority();
+        if (minPriority <= priority && priority < maxPriority) {
+            callExecutor.dispatch(callTask);
+        } else {
+            delegate.dispatch(callTask);
+        }
+    }
+
+    @Override
+    public int getGeneralQueueLength() {
+        // not the best way to calculate, but don't have a better way to hook
+        // into metrics at the moment
+        return this.delegate.getGeneralQueueLength() + 
this.callExecutor.getQueueLength();
+    }
+
+    @Override
+    public int getPriorityQueueLength() {
+        return this.delegate.getPriorityQueueLength();
+    }
+
+    @Override
+    public int getReplicationQueueLength() {
+        return this.delegate.getReplicationQueueLength();
+    }
+
+    @Override
+    public int getActiveRpcHandlerCount() {
+        return this.delegate.getActiveRpcHandlerCount() + 
this.callExecutor.getActiveHandlerCount();
+    }
+
+    @VisibleForTesting
+    public void setExecutorForTesting(RpcExecutor executor) {
+        this.callExecutor = executor;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java
new file mode 100644
index 0000000..5681d71
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosCompat.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
+
+/**
+ * Helper class to avoid loading HBase 0.98.3+ classes in older HBase 
installations
+ */
+public class IndexQosCompat {
+
+    private static final Log LOG = LogFactory.getLog(IndexQosCompat.class);
+
+    /**
+     * Full class name of the RpcControllerFactory. This is copied here so we 
don't need the static reference, so we can work with older versions of HBase 
0.98, which don't have this class
+     */
+    private static final String HBASE_RPC_CONTROLLER_CLASS_NAME =
+            "org.apache.hadoop.hbase.ipc.RpcControllerFactory";
+    private static volatile boolean checked = false;
+    private static boolean rpcControllerExists = false;
+
+    private IndexQosCompat() {
+        // private ctor for util class
+    }
+
+    /**
+     * @param tableName name of the index table
+     * @return configuration key for if a table should have Index QOS writes 
(its a target index
+     *         table)
+     */
+    public static String getTableIndexQosConfKey(String tableName) {
+        return "phoenix.index.table.qos._" + tableName;
+    }
+
+    /**
+     * Set the index rpc controller, if the rpc controller exists. No-op if 
there the RpcController
+     * is not on the classpath.
+     * @param conf to update
+     */
+    public static void setPhoenixIndexRpcController(Configuration conf) {
+        if (rpcControllerExists()) {
+            // then we can load the class just fine
+            conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+                PhoenixIndexRpcSchedulerFactory.class.getName());
+        }
+    }
+
+    private static boolean rpcControllerExists() {
+        if (checked) {
+            synchronized (IndexQosCompat.class) {
+                if (!checked) {
+                    // try loading the class
+                    try {
+                        Class.forName(HBASE_RPC_CONTROLLER_CLASS_NAME);
+                        rpcControllerExists = true;
+                    } catch (ClassNotFoundException e) {
+                        LOG.warn("RpcControllerFactory doesn't exist, not 
setting custom index handler properties.");
+                        rpcControllerExists = false;
+                    }
+
+                    checked = true;
+                }
+            }
+        }
+        return rpcControllerExists;
+    }
+
+    /**
+     * Ensure that the given table is enabled for index QOS handling
+     * @param conf configuration to read/update
+     * @param tableName name of the table to configure for index handlers
+     */
+    public static void enableIndexQosForTable(Configuration conf, String 
tableName) {
+        String confKey = IndexQosCompat.getTableIndexQosConfKey(tableName);
+        if (conf.get(confKey) == null) {
+            conf.setBoolean(confKey, true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
new file mode 100644
index 0000000..aa8b8d1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
+
+/**
+ * {@link RpcControllerFactory} that overrides the standard {@link 
PayloadCarryingRpcController} to
+ * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to 
use the Index priority.
+ */
+public class IndexQosRpcControllerFactory extends RpcControllerFactory {
+
+    public static final String INDEX_TABLE_NAMES_KEY = 
"phoenix.index.rpc.controller.index-tables";
+
+    public IndexQosRpcControllerFactory(Configuration conf) {
+        super(conf);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController() {
+        PayloadCarryingRpcController delegate = super.newController();
+        return new IndexQosRpcController(delegate, conf);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(CellScanner cellScanner) 
{
+        PayloadCarryingRpcController delegate = 
super.newController(cellScanner);
+        return new IndexQosRpcController(delegate, conf);
+    }
+
+    @Override
+    public PayloadCarryingRpcController newController(List<CellScannable> 
cellIterables) {
+        PayloadCarryingRpcController delegate = 
super.newController(cellIterables);
+        return new IndexQosRpcController(delegate, conf);
+    }
+
+    private class IndexQosRpcController extends 
DelegatingPayloadCarryingRpcController {
+
+        private Configuration conf;
+        private int priority;
+
+        public IndexQosRpcController(PayloadCarryingRpcController delegate, 
Configuration conf) {
+            super(delegate);
+            this.conf = conf;
+            this.priority = 
PhoenixIndexRpcSchedulerFactory.getMinPriority(conf);
+        }
+
+        @Override
+        public void setPriority(final TableName tn) {
+            // if its an index table, then we override to the index priority
+            if (isIndexTable(tn)) {
+                setPriority(this.priority);
+            } else {
+                super.setPriority(tn);
+            }
+        }
+
+        private boolean isIndexTable(TableName tn) {
+            return 
conf.get(IndexQosCompat.getTableIndexQosConfKey(tn.getNameAsString())) == null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 45e5495..ba9534c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
-
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
 import org.apache.phoenix.hbase.index.parallel.Task;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index aa2225b..1c9782e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
-
 import org.apache.phoenix.hbase.index.Indexer;
 
 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
new file mode 100644
index 0000000..500db7c
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we 
can access the
+ * {@link SimpleRpcSchedulerFactory}.
+ */
+public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixIndexRpcSchedulerFactory.class);
+
+    private static final String INDEX_HANDLER_COUNT_KEY =
+            "org.apache.phoenix.regionserver.index.handler.count";
+    private static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
+
+    /**
+     * HConstants#HIGH_QOS is the max we will see to a standard table. We go 
higher to differentiate
+     * and give some room for things in the middle
+     */
+    public static final int DEFAULT_INDEX_MIN_PRIORITY = 1000;
+    public static final int DEFAULT_INDEX_MAX_PRIORITY = 1050;
+    public static final String MIN_INDEX_PRIOIRTY_KEY =
+            "org.apache.phoenix.regionserver.index.priority.min";
+    public static final String MAX_INDEX_PRIOIRTY_KEY =
+            "org.apache.phoenix.regionserver.index.priority.max";
+
+    private static final String VERSION_TOO_OLD_FOR_INDEX_RPC =
+            "Running an older version of HBase (less than 0.98.4), Phoenix 
index RPC handling cannot be enabled.";
+
+    @Override
+    public RpcScheduler create(Configuration conf, RegionServerServices 
services) {
+        // create the delegate scheduler
+        RpcScheduler delegate;
+        try {
+            // happens in <=0.98.4 where the scheduler factory is not visible
+            delegate = new SimpleRpcSchedulerFactory().create(conf, services);
+        } catch (IllegalAccessError e) {
+            LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
+            throw e;
+        }
+        try {
+            // make sure we are on a version that phoenix can support
+            Class.forName("org.apache.hadoop.hbase.ipc.RpcExecutor");
+        } catch (ClassNotFoundException e) {
+            LOG.error(VERSION_TOO_OLD_FOR_INDEX_RPC
+                    + " Instead, using falling back to Simple RPC 
scheduling.");
+            return delegate;
+        }
+
+        int indexHandlerCount = conf.getInt(INDEX_HANDLER_COUNT_KEY, 
DEFAULT_INDEX_HANDLER_COUNT);
+        int minPriority = getMinPriority(conf);
+        int maxPriority = conf.getInt(MAX_INDEX_PRIOIRTY_KEY, 
DEFAULT_INDEX_MAX_PRIORITY);
+        // make sure the ranges are outside the warning ranges
+        Preconditions.checkArgument(maxPriority > minPriority, "Max index 
priority (" + maxPriority
+                + ") must be larger than min priority (" + minPriority + ")");
+        boolean allSmaller =
+                minPriority < HConstants.REPLICATION_QOS
+                        && maxPriority < HConstants.REPLICATION_QOS;
+        boolean allLarger = minPriority > HConstants.HIGH_QOS;
+        Preconditions.checkArgument(allSmaller || allLarger, "Index priority 
range (" + minPriority
+                + ",  " + maxPriority + ") must be outside HBase priority 
range ("
+                + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + 
")");
+
+        LOG.info("Using custom Phoenix Index RPC Handling with " + 
indexHandlerCount
+                + " handlers and priority range [" + minPriority + ", " + 
maxPriority + ")");
+
+        PhoenixIndexRpcScheduler scheduler =
+                new PhoenixIndexRpcScheduler(indexHandlerCount, conf, 
delegate, minPriority,
+                        maxPriority);
+        return scheduler;
+    }
+
+    public static int getMinPriority(Configuration conf) {
+        return conf.getInt(MIN_INDEX_PRIOIRTY_KEY, DEFAULT_INDEX_MIN_PRIORITY);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index 8ef3e4f..907eb3d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.phoenix.hbase.index.table;
 
 import java.io.IOException;
@@ -28,42 +27,50 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.phoenix.hbase.index.IndexQosCompat;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 public class CoprocessorHTableFactory implements HTableFactory {
 
-  /** Number of milliseconds per-interval to retry zookeeper */
-  private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = 
"zookeeper.recovery.retry.intervalmill";
-  /** Number of retries for zookeeper */
-  private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = 
"zookeeper.recovery.retry";
-  private static final Log LOG = 
LogFactory.getLog(CoprocessorHTableFactory.class);
-  private CoprocessorEnvironment e;
+    /** Number of milliseconds per-interval to retry zookeeper */
+    private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL =
+            "zookeeper.recovery.retry.intervalmill";
+    /** Number of retries for zookeeper */
+    private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = 
"zookeeper.recovery.retry";
+    private static final Log LOG = 
LogFactory.getLog(CoprocessorHTableFactory.class);
+    private CoprocessorEnvironment e;
+
+    public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+        this.e = e;
+    }
+
+    @Override
+    public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
+        Configuration conf = e.getConfiguration();
+        // make sure writers fail fast
+        IndexManagementUtil.setIfNotSet(conf, 
HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 
1000);
+        IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+        IndexManagementUtil.setIfNotSet(conf, 
ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+        IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 
30000);
+        IndexManagementUtil.setIfNotSet(conf, 
HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
 
-  public CoprocessorHTableFactory(CoprocessorEnvironment e) {
-    this.e = e;
-  }
+        // make sure we use the index priority writer for our rpcs
+        IndexQosCompat.setPhoenixIndexRpcController(conf);
 
-  @Override
-  public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
-    Configuration conf = e.getConfiguration();
-    // make sure writers fail fast
-    IndexManagementUtil.setIfNotSet(conf, 
HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
-    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
-    IndexManagementUtil.setIfNotSet(conf, 
ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
-    IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 
30000);
-    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 
5000);
+        // make sure we include the index table in the tables we need to track
+        String tableName = Bytes.toString(tablename.copyBytesIfNecessary());
+        IndexQosCompat.enableIndexQosForTable(conf, tableName);
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating new HTable: " + 
Bytes.toString(tablename.copyBytesIfNecessary()));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating new HTable: " + tableName);
+        }
+        return 
this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
     }
-    return 
this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
-  }
 
-  @Override
-  public void shutdown() {
-    // noop
-  }
+    @Override
+    public void shutdown() {
+        // noop
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index d4f8207..9b6fc4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -494,6 +494,12 @@ public class IndexUtil {
                 public byte[] getRow() {
                     return cell.getRow();
                 }
+
+                @Override
+                @Deprecated
+                public int getTagsLengthUnsigned() {
+                    return cell.getTagsLengthUnsigned();
+                }
             };
             // Wrap cell in cell that offsets row key
             result.set(i, newCell);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
new file mode 100644
index 0000000..ec18d9b
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ipc.RpcScheduler.Context;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test that the rpc scheduler schedules index writes to the index handler 
queue and sends
+ * everything else to the standard queues
+ */
+public class PhoenixIndexRpcSchedulerTest {
+
+    private static final Configuration conf = HBaseConfiguration.create();
+
+    @Test
+    public void testIndexPriorityWritesToIndexHandler() throws Exception {
+        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+
+        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, 
conf, mock, 200, 250);
+        BalancedQueueRpcExecutor executor = new 
BalancedQueueRpcExecutor("test-queue", 1, 1, 1);
+        scheduler.setExecutorForTesting(executor);
+        dispatchCallWithPriority(scheduler, 200);
+        List<BlockingQueue<CallRunner>> queues = executor.getQueues();
+        assertEquals(1, queues.size());
+        BlockingQueue<CallRunner> queue = queues.get(0);
+        queue.poll(20, TimeUnit.SECONDS);
+
+        // try again, this time we tweak the ranges we support
+        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
+        scheduler.setExecutorForTesting(executor);
+        dispatchCallWithPriority(scheduler, 101);
+        queue.poll(20, TimeUnit.SECONDS);
+
+        Mockito.verify(mock, 
Mockito.times(2)).init(Mockito.any(Context.class));
+        Mockito.verifyNoMoreInteractions(mock);
+    }
+
+    /**
+     * Test that we delegate to the passed {@link RpcScheduler} when the call 
priority is outside
+     * the index range
+     * @throws Exception
+     */
+    @Test
+    public void testDelegateWhenOutsideRange() throws Exception {
+        RpcScheduler mock = Mockito.mock(RpcScheduler.class);
+        PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, 
conf, mock, 200, 250);
+        dispatchCallWithPriority(scheduler, 100);
+        dispatchCallWithPriority(scheduler, 250);
+
+        // try again, this time we tweak the ranges we support
+        scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110);
+        dispatchCallWithPriority(scheduler, 200);
+        dispatchCallWithPriority(scheduler, 110);
+
+        Mockito.verify(mock, 
Mockito.times(4)).init(Mockito.any(Context.class));
+        Mockito.verify(mock, 
Mockito.times(4)).dispatch(Mockito.any(CallRunner.class));
+        Mockito.verifyNoMoreInteractions(mock);
+    }
+
+    private void dispatchCallWithPriority(RpcScheduler scheduler, int 
priority) throws Exception {
+        CallRunner task = Mockito.mock(CallRunner.class);
+        RequestHeader header = 
RequestHeader.newBuilder().setPriority(priority).build();
+        RpcServer server = new RpcServer(null, "test-rpcserver", null, null, 
conf, scheduler);
+        RpcServer.Call call =
+                server.new Call(0, null, null, header, null, null, null, null, 
10, null);
+        Mockito.when(task.getCall()).thenReturn(call);
+
+        scheduler.dispatch(task);
+
+        Mockito.verify(task).getCall();
+        Mockito.verifyNoMoreInteractions(task);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
new file mode 100644
index 0000000..0fd3d6b
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory;
+import org.junit.Test;
+
+public class PhoenixIndexRpcSchedulerFactoryTest {
+
+    @Test
+    public void ensureInstantiation() throws Exception {
+        Configuration conf = new Configuration(false);
+        conf.setClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+            PhoenixIndexRpcSchedulerFactory.class, RpcSchedulerFactory.class);
+        // kinda lame that we copy the copy from the regionserver to do this 
and can't use a static
+        // method, but meh
+        try {
+            Class<?> rpcSchedulerFactoryClass =
+                    
conf.getClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+                        SimpleRpcSchedulerFactory.class);
+            Object o = ((RpcSchedulerFactory) 
rpcSchedulerFactoryClass.newInstance());
+            assertTrue(o instanceof PhoenixIndexRpcSchedulerFactory);
+        } catch (InstantiationException e) {
+            assertTrue("Should not have got an exception when instantiing the 
rpc scheduler: " + e,
+                false);
+        } catch (IllegalAccessException e) {
+            assertTrue("Should not have got an exception when instantiing the 
rpc scheduler: " + e,
+                false);
+        }
+    }
+
+    /**
+     * Ensure that we can't configure the index priority ranges inside the 
hbase ranges
+     * @throws Exception
+     */
+    @Test
+    public void testValidateIndexPriorityRanges() throws Exception {
+        Configuration conf = new Configuration(false);
+        // standard configs should be fine
+        PhoenixIndexRpcSchedulerFactory factory = new 
PhoenixIndexRpcSchedulerFactory();
+        factory.create(conf, null);
+
+        setMinMax(conf, 0, 4);
+        factory.create(conf, null);
+
+        setMinMax(conf, 101, 102);
+        factory.create(conf, null);
+
+        setMinMax(conf, 102, 101);
+        try {
+            factory.create(conf, null);
+            fail("Should not have allowed max less than min");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        setMinMax(conf, 5, 6);
+        try {
+            factory.create(conf, null);
+            fail("Should not have allowed min in range");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        setMinMax(conf, 6, 60);
+        try {
+            factory.create(conf, null);
+            fail("Should not have allowed min/max in hbase range");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        setMinMax(conf, 6, 101);
+        try {
+            factory.create(conf, null);
+            fail("Should not have allowed in range");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+    private void setMinMax(Configuration conf, int min, int max) {
+        conf.setInt(PhoenixIndexRpcSchedulerFactory.MIN_INDEX_PRIOIRTY_KEY, 
min);
+        conf.setInt(PhoenixIndexRpcSchedulerFactory.MAX_INDEX_PRIOIRTY_KEY, 
max);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/463dabdc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a4df504..1e92940 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,8 +67,8 @@
     <test.output.tofile>true</test.output.tofile>
 
     <!-- Hadoop Versions -->
-    <hbase-hadoop1.version>0.98.1-hadoop1</hbase-hadoop1.version>
-    <hbase-hadoop2.version>0.98.1-hadoop2</hbase-hadoop2.version>
+    <hbase-hadoop1.version>0.98.4-hadoop1</hbase-hadoop1.version>
+    <hbase-hadoop2.version>0.98.4-hadoop2</hbase-hadoop2.version>
     <hadoop-one.version>1.0.4</hadoop-one.version>
     <hadoop-two.version>2.2.0</hadoop-two.version>
 

Reply via email to