Repository: phoenix
Updated Branches:
  refs/heads/txn b14e88551 -> 30bfa7304


PHOENIX-2373- Change ReserveNSequence Udf to take in zookeeper and tentantId


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a5046e4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a5046e4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a5046e4

Branch: refs/heads/txn
Commit: 8a5046e4044d77938a13d4abf7bee2bdca581917
Parents: e1e4344
Author: Siddhi <[email protected]>
Authored: Thu Nov 5 17:23:53 2015 -0800
Committer: Jan <[email protected]>
Committed: Tue Nov 10 11:07:29 2015 -0800

----------------------------------------------------------------------
 .../phoenix/pig/udf/ReserveNSequenceTestIT.java | 86 +++++++++++++++----
 .../phoenix/pig/udf/ReserveNSequence.java       | 88 +++++++++++++++-----
 2 files changed, 137 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a5046e4/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
index 05c7c82..cea1a8a 100644
--- 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
+++ 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java
@@ -27,10 +27,12 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.UDFContext;
@@ -52,7 +54,7 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
     private static final long MAX_VALUE = 10;
 
     private static TupleFactory TF;
-    private static Connection conn;
+    private static Connection globalConn;
     private static String zkQuorum;
     private static Configuration conf;
     private static UDFContext udfContext;
@@ -65,15 +67,14 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
         conf = getTestClusterConfig();
         zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + 
getZKClientPort(getTestClusterConfig());
         conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
-        // Properties props = 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl());
+        globalConn = DriverManager.getConnection(getUrl());
         // Pig variables
         TF = TupleFactory.getInstance();
     }
 
     @Before
     public void setUp() throws SQLException {
-        createSequence();
+        createSequence(globalConn);
         createUdfContext();
     }
 
@@ -140,18 +141,69 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
         props.setErrorMessage("Sequence undefined");
         doTest(props);
     }
+    
+    /**
+     * Test reserving sequence with tenant Id passed to udf.
+     * @throws Exception
+     */
+    @Test
+    public void testTenantSequence() throws Exception {
+        Properties tentantProps = new Properties();
+        String tenantId = "TENANT";
+        tentantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        Connection tenantConn = DriverManager.getConnection(getUrl(), 
tentantProps);
+        createSequence(tenantConn);
+
+        try {
+            UDFTestProperties props = new UDFTestProperties(3);
+
+            // validates UDF reservation is for that tentant
+            doTest(tenantConn, props);
+
+            // validate global sequence value is still set to 1
+            assertEquals(1L, getNextSequenceValue(globalConn));
+        } finally {
+            dropSequence(tenantConn);
+        }
+    }
+    
+    /**
+     * Test Use the udf to reserve multiple tuples
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testMultipleTuples() throws Exception {
+        Tuple tuple = TF.newTuple(2);
+        tuple.set(0, 2L);
+        tuple.set(1, SEQUENCE_NAME);
 
+        final String tentantId = 
globalConn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
+        ReserveNSequence udf = new ReserveNSequence(zkQuorum, tentantId);
+
+        for (int i = 0; i < 2; i++) {
+            udf.exec(tuple);
+        }
+        long nextValue = getNextSequenceValue(globalConn);
+        assertEquals(5L, nextValue);
+    }
+    
     private void doTest(UDFTestProperties props) throws Exception {
-        setCurrentValue(props.getCurrentValue());
+        doTest(globalConn, props);
+    }
+
+    private void doTest(Connection conn, UDFTestProperties props) throws 
Exception {
+        setCurrentValue(conn, props.getCurrentValue());
         Tuple tuple = TF.newTuple(3);
         tuple.set(0, props.getNumToReserve());
         tuple.set(1, props.getSequenceName());
         tuple.set(2, zkQuorum);
         Long result = null;
         try {
-            ReserveNSequence udf = new ReserveNSequence();
+            final String tenantId = 
conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB);
+            ReserveNSequence udf = new ReserveNSequence(zkQuorum, tenantId);
             result = udf.exec(tuple);
-            validateReservedSequence(props.getCurrentValue(), 
props.getNumToReserve(), result);
+            validateReservedSequence(conn, props.getCurrentValue(), 
props.getNumToReserve(), result);
         } catch (Exception e) {
             if (props.isExceptionExpected()) {
                 assertEquals(props.getExceptionClass(), e.getClass());
@@ -160,34 +212,32 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
                 throw e;
             }
         }
-
     }
 
     private void createUdfContext() {
-        conf.set(ReserveNSequence.SEQUENCE_NAME_CONF_KEY, SEQUENCE_NAME);
         udfContext = UDFContext.getUDFContext();
         udfContext.addJobConf(conf);
     }
 
-    private void validateReservedSequence(Long currentValue, long count, Long 
result) throws SQLException {
+    private void validateReservedSequence(Connection conn, Long currentValue, 
long count, Long result) throws SQLException {
         Long startIndex = currentValue + 1;
         assertEquals("Start index is incorrect", startIndex, result);
-        final long newNextSequenceValue = getNextSequenceValue();
+        final long newNextSequenceValue = getNextSequenceValue(conn);
         assertEquals(startIndex + count, newNextSequenceValue);
     }
 
-    private void createSequence() throws SQLException {
+    private void createSequence(Connection conn) throws SQLException {
         conn.createStatement().execute(String.format(CREATE_SEQUENCE_SYNTAX, 
SEQUENCE_NAME, 1, 1, 1, MAX_VALUE, 1));
         conn.commit();
     }
 
-    private void setCurrentValue(long currentValue) throws SQLException {
+    private void setCurrentValue(Connection conn, long currentValue) throws 
SQLException {
         for (int i = 1; i <= currentValue; i++) {
-            getNextSequenceValue();
+            getNextSequenceValue(conn);
         }
     }
 
-    private long getNextSequenceValue() throws SQLException {
+    private long getNextSequenceValue(Connection conn) throws SQLException {
         String ddl = new StringBuilder().append("SELECT NEXT VALUE FOR 
").append(SEQUENCE_NAME).toString();
         ResultSet rs = conn.createStatement().executeQuery(ddl);
         assertTrue(rs.next());
@@ -195,7 +245,7 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
         return rs.getLong(1);
     }
 
-    private void dropSequence() throws Exception {
+    private void dropSequence(Connection conn) throws Exception {
         String ddl = new StringBuilder().append("DROP SEQUENCE 
").append(SEQUENCE_NAME).toString();
         conn.createStatement().execute(ddl);
         conn.commit();
@@ -204,12 +254,12 @@ public class ReserveNSequenceTestIT extends 
BaseHBaseManagedTimeIT {
     @After
     public void tearDown() throws Exception {
         udfContext.reset();
-        dropSequence();
+        dropSequence(globalConn);
     }
 
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
-        conn.close();
+        globalConn.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a5046e4/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
index 308f170..6187d5e 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java
@@ -1,17 +1,22 @@
 /**
- * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.phoenix.pig.udf;
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
- * governing permissions and limitations under the License.
- */
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -20,11 +25,16 @@ import java.sql.SQLException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
 
 /**
  * UDF to Reserve a chunk of numbers for a given sequence
@@ -34,29 +44,37 @@ import com.google.common.base.Preconditions;
  */
 public class ReserveNSequence extends EvalFunc<Long> {
 
-    public static final String INVALID_TUPLE_MESSAGE = "Tuple should have 
correct fields(NumtoReserve,SequenceName,zkquorum.";
+    public static final String INVALID_TUPLE_MESSAGE = "Tuple should have 
correct fields(NumtoReserve,SequenceName).";
     public static final String EMPTY_SEQUENCE_NAME_MESSAGE = "Sequence name 
should be not null";
     public static final String EMPTY_ZK_MESSAGE = "ZKQuorum should be not 
null";
     public static final String INVALID_NUMBER_MESSAGE = "Number of Sequences 
to Reserve should be greater than 0";
-    public static final String SEQUENCE_NAME_CONF_KEY = 
"phoenix.sequence.name";
 
+    private final String zkQuorum;
+    private final String tenantId;
+    private Configuration configuration;
+    Connection connection;
+    
+    public ReserveNSequence(@NonNull String zkQuorum, @Nullable String 
tenantId) {
+        Preconditions.checkNotNull(zkQuorum, EMPTY_ZK_MESSAGE);
+        this.zkQuorum = zkQuorum;
+        this.tenantId = tenantId;
+    }
     /**
      * Reserve N next sequences for a sequence name. N is the first field in 
the tuple. Sequence name is the second
      * field in the tuple zkquorum is the third field in the tuple
      */
     @Override
     public Long exec(Tuple input) throws IOException {
-        Preconditions.checkArgument(input != null && input.size() == 3, 
INVALID_TUPLE_MESSAGE);
+        Preconditions.checkArgument(input != null && input.size() >= 2, 
INVALID_TUPLE_MESSAGE);
         Long numToReserve = (Long)(input.get(0));
         Preconditions.checkArgument(numToReserve > 0, INVALID_NUMBER_MESSAGE);
         String sequenceName = (String)input.get(1);
         Preconditions.checkNotNull(sequenceName, EMPTY_SEQUENCE_NAME_MESSAGE);
-        String zkquorum = (String)input.get(2);
-        Preconditions.checkNotNull(zkquorum, EMPTY_ZK_MESSAGE);
-        UDFContext context = UDFContext.getUDFContext();
-        Configuration configuration = context.getJobConf();
-        configuration.set(HConstants.ZOOKEEPER_QUORUM, zkquorum);
-        Connection connection = null;
+        // It will create a connection when called for the first Tuple per 
task.
+        // The connection gets cleaned up in finish() method
+        if (connection == null) {
+            initConnection();
+        }
         ResultSet rs = null;
         try {
             connection = ConnectionUtil.getOutputConnection(configuration);
@@ -79,6 +97,38 @@ public class ReserveNSequence extends EvalFunc<Long> {
             }
         }
     }
+    
+    /**
+     * Cleanup to be performed at the end.
+     * Close connection
+     */
+    @Override
+    public void finish() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                throw new RuntimeException("Caught exception while closing 
connection", e);
+            }
+        }
+    }
+    
+    private void initConnection() throws IOException {
+        // Create correct configuration to be used to make phoenix connections
+        UDFContext context = UDFContext.getUDFContext();
+        configuration = new Configuration(context.getJobConf());
+        configuration.set(HConstants.ZOOKEEPER_QUORUM, this.zkQuorum);
+        if (Strings.isNullOrEmpty(tenantId)) {
+            configuration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
+        } else {
+            configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        try {
+            connection = ConnectionUtil.getOutputConnection(configuration);
+        } catch (SQLException e) {
+            throw new IOException("Caught exception while creating 
connection", e);
+        }
+    }
 
     private String getNextNSequenceSelectStatement(Long numToReserve, String 
sequenceName) {
         return new StringBuilder().append("SELECT NEXT " + numToReserve + " 
VALUES" + " FOR ").append(sequenceName)

Reply via email to