Repository: phoenix Updated Branches: refs/heads/txn b14e88551 -> 30bfa7304
PHOENIX-2373- Change ReserveNSequence Udf to take in zookeeper and tentantId Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a5046e4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a5046e4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a5046e4 Branch: refs/heads/txn Commit: 8a5046e4044d77938a13d4abf7bee2bdca581917 Parents: e1e4344 Author: Siddhi <[email protected]> Authored: Thu Nov 5 17:23:53 2015 -0800 Committer: Jan <[email protected]> Committed: Tue Nov 10 11:07:29 2015 -0800 ---------------------------------------------------------------------- .../phoenix/pig/udf/ReserveNSequenceTestIT.java | 86 +++++++++++++++---- .../phoenix/pig/udf/ReserveNSequence.java | 88 +++++++++++++++----- 2 files changed, 137 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a5046e4/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java index 05c7c82..cea1a8a 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/udf/ReserveNSequenceTestIT.java @@ -27,10 +27,12 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.UDFContext; @@ -52,7 +54,7 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { private static final long MAX_VALUE = 10; private static TupleFactory TF; - private static Connection conn; + private static Connection globalConn; private static String zkQuorum; private static Configuration conf; private static UDFContext udfContext; @@ -65,15 +67,14 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { conf = getTestClusterConfig(); zkQuorum = LOCALHOST + JDBC_PROTOCOL_SEPARATOR + getZKClientPort(getTestClusterConfig()); conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - // Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); - conn = DriverManager.getConnection(getUrl()); + globalConn = DriverManager.getConnection(getUrl()); // Pig variables TF = TupleFactory.getInstance(); } @Before public void setUp() throws SQLException { - createSequence(); + createSequence(globalConn); createUdfContext(); } @@ -140,18 +141,69 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { props.setErrorMessage("Sequence undefined"); doTest(props); } + + /** + * Test reserving sequence with tenant Id passed to udf. + * @throws Exception + */ + @Test + public void testTenantSequence() throws Exception { + Properties tentantProps = new Properties(); + String tenantId = "TENANT"; + tentantProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + Connection tenantConn = DriverManager.getConnection(getUrl(), tentantProps); + createSequence(tenantConn); + + try { + UDFTestProperties props = new UDFTestProperties(3); + + // validates UDF reservation is for that tentant + doTest(tenantConn, props); + + // validate global sequence value is still set to 1 + assertEquals(1L, getNextSequenceValue(globalConn)); + } finally { + dropSequence(tenantConn); + } + } + + /** + * Test Use the udf to reserve multiple tuples + * + * @throws Exception + */ + @Test + public void testMultipleTuples() throws Exception { + Tuple tuple = TF.newTuple(2); + tuple.set(0, 2L); + tuple.set(1, SEQUENCE_NAME); + final String tentantId = globalConn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB); + ReserveNSequence udf = new ReserveNSequence(zkQuorum, tentantId); + + for (int i = 0; i < 2; i++) { + udf.exec(tuple); + } + long nextValue = getNextSequenceValue(globalConn); + assertEquals(5L, nextValue); + } + private void doTest(UDFTestProperties props) throws Exception { - setCurrentValue(props.getCurrentValue()); + doTest(globalConn, props); + } + + private void doTest(Connection conn, UDFTestProperties props) throws Exception { + setCurrentValue(conn, props.getCurrentValue()); Tuple tuple = TF.newTuple(3); tuple.set(0, props.getNumToReserve()); tuple.set(1, props.getSequenceName()); tuple.set(2, zkQuorum); Long result = null; try { - ReserveNSequence udf = new ReserveNSequence(); + final String tenantId = conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB); + ReserveNSequence udf = new ReserveNSequence(zkQuorum, tenantId); result = udf.exec(tuple); - validateReservedSequence(props.getCurrentValue(), props.getNumToReserve(), result); + validateReservedSequence(conn, props.getCurrentValue(), props.getNumToReserve(), result); } catch (Exception e) { if (props.isExceptionExpected()) { assertEquals(props.getExceptionClass(), e.getClass()); @@ -160,34 +212,32 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { throw e; } } - } private void createUdfContext() { - conf.set(ReserveNSequence.SEQUENCE_NAME_CONF_KEY, SEQUENCE_NAME); udfContext = UDFContext.getUDFContext(); udfContext.addJobConf(conf); } - private void validateReservedSequence(Long currentValue, long count, Long result) throws SQLException { + private void validateReservedSequence(Connection conn, Long currentValue, long count, Long result) throws SQLException { Long startIndex = currentValue + 1; assertEquals("Start index is incorrect", startIndex, result); - final long newNextSequenceValue = getNextSequenceValue(); + final long newNextSequenceValue = getNextSequenceValue(conn); assertEquals(startIndex + count, newNextSequenceValue); } - private void createSequence() throws SQLException { + private void createSequence(Connection conn) throws SQLException { conn.createStatement().execute(String.format(CREATE_SEQUENCE_SYNTAX, SEQUENCE_NAME, 1, 1, 1, MAX_VALUE, 1)); conn.commit(); } - private void setCurrentValue(long currentValue) throws SQLException { + private void setCurrentValue(Connection conn, long currentValue) throws SQLException { for (int i = 1; i <= currentValue; i++) { - getNextSequenceValue(); + getNextSequenceValue(conn); } } - private long getNextSequenceValue() throws SQLException { + private long getNextSequenceValue(Connection conn) throws SQLException { String ddl = new StringBuilder().append("SELECT NEXT VALUE FOR ").append(SEQUENCE_NAME).toString(); ResultSet rs = conn.createStatement().executeQuery(ddl); assertTrue(rs.next()); @@ -195,7 +245,7 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { return rs.getLong(1); } - private void dropSequence() throws Exception { + private void dropSequence(Connection conn) throws Exception { String ddl = new StringBuilder().append("DROP SEQUENCE ").append(SEQUENCE_NAME).toString(); conn.createStatement().execute(ddl); conn.commit(); @@ -204,12 +254,12 @@ public class ReserveNSequenceTestIT extends BaseHBaseManagedTimeIT { @After public void tearDown() throws Exception { udfContext.reset(); - dropSequence(); + dropSequence(globalConn); } @AfterClass public static void tearDownAfterClass() throws Exception { - conn.close(); + globalConn.close(); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a5046e4/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java index 308f170..6187d5e 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/udf/ReserveNSequence.java @@ -1,17 +1,22 @@ /** - * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.phoenix.pig.udf; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; @@ -20,11 +25,16 @@ import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; /** * UDF to Reserve a chunk of numbers for a given sequence @@ -34,29 +44,37 @@ import com.google.common.base.Preconditions; */ public class ReserveNSequence extends EvalFunc<Long> { - public static final String INVALID_TUPLE_MESSAGE = "Tuple should have correct fields(NumtoReserve,SequenceName,zkquorum."; + public static final String INVALID_TUPLE_MESSAGE = "Tuple should have correct fields(NumtoReserve,SequenceName)."; public static final String EMPTY_SEQUENCE_NAME_MESSAGE = "Sequence name should be not null"; public static final String EMPTY_ZK_MESSAGE = "ZKQuorum should be not null"; public static final String INVALID_NUMBER_MESSAGE = "Number of Sequences to Reserve should be greater than 0"; - public static final String SEQUENCE_NAME_CONF_KEY = "phoenix.sequence.name"; + private final String zkQuorum; + private final String tenantId; + private Configuration configuration; + Connection connection; + + public ReserveNSequence(@NonNull String zkQuorum, @Nullable String tenantId) { + Preconditions.checkNotNull(zkQuorum, EMPTY_ZK_MESSAGE); + this.zkQuorum = zkQuorum; + this.tenantId = tenantId; + } /** * Reserve N next sequences for a sequence name. N is the first field in the tuple. Sequence name is the second * field in the tuple zkquorum is the third field in the tuple */ @Override public Long exec(Tuple input) throws IOException { - Preconditions.checkArgument(input != null && input.size() == 3, INVALID_TUPLE_MESSAGE); + Preconditions.checkArgument(input != null && input.size() >= 2, INVALID_TUPLE_MESSAGE); Long numToReserve = (Long)(input.get(0)); Preconditions.checkArgument(numToReserve > 0, INVALID_NUMBER_MESSAGE); String sequenceName = (String)input.get(1); Preconditions.checkNotNull(sequenceName, EMPTY_SEQUENCE_NAME_MESSAGE); - String zkquorum = (String)input.get(2); - Preconditions.checkNotNull(zkquorum, EMPTY_ZK_MESSAGE); - UDFContext context = UDFContext.getUDFContext(); - Configuration configuration = context.getJobConf(); - configuration.set(HConstants.ZOOKEEPER_QUORUM, zkquorum); - Connection connection = null; + // It will create a connection when called for the first Tuple per task. + // The connection gets cleaned up in finish() method + if (connection == null) { + initConnection(); + } ResultSet rs = null; try { connection = ConnectionUtil.getOutputConnection(configuration); @@ -79,6 +97,38 @@ public class ReserveNSequence extends EvalFunc<Long> { } } } + + /** + * Cleanup to be performed at the end. + * Close connection + */ + @Override + public void finish() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + throw new RuntimeException("Caught exception while closing connection", e); + } + } + } + + private void initConnection() throws IOException { + // Create correct configuration to be used to make phoenix connections + UDFContext context = UDFContext.getUDFContext(); + configuration = new Configuration(context.getJobConf()); + configuration.set(HConstants.ZOOKEEPER_QUORUM, this.zkQuorum); + if (Strings.isNullOrEmpty(tenantId)) { + configuration.unset(PhoenixRuntime.TENANT_ID_ATTRIB); + } else { + configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + try { + connection = ConnectionUtil.getOutputConnection(configuration); + } catch (SQLException e) { + throw new IOException("Caught exception while creating connection", e); + } + } private String getNextNSequenceSelectStatement(Long numToReserve, String sequenceName) { return new StringBuilder().append("SELECT NEXT " + numToReserve + " VALUES" + " FOR ").append(sequenceName)
