Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.3 814afcb93 -> bd34ae79f


PHOENIX-4607 - Allow PhoenixInputFormat to use tenant-specific connections


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bd34ae79
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bd34ae79
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bd34ae79

Branch: refs/heads/4.x-HBase-1.3
Commit: bd34ae79fe4fe174968c64b87f0f45a21fb161fb
Parents: 814afcb
Author: Geoffrey <gjac...@salesforce.com>
Authored: Tue Feb 20 14:28:40 2018 -0800
Committer: Geoffrey Jacoby <gjac...@apache.org>
Committed: Fri Mar 2 12:55:52 2018 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/MapReduceIT.java | 69 +++++++++++++++-----
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 41 +++++++-----
 .../util/PhoenixConfigurationUtil.java          | 15 ++++-
 .../mapreduce/util/PhoenixMapReduceUtil.java    |  5 +-
 4 files changed, 92 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd34ae79/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 68d9c9c..fb24bb2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -30,11 +30,13 @@ import 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.sql.*;
+import java.util.Properties;
 
 import static org.junit.Assert.*;
 
@@ -50,12 +52,19 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
             " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL, 
 RECORDINGS_QUARTER " +
             " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, 
RECORDING_YEAR ))";
 
+    private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS 
%s (v1 VARCHAR) AS "
+        + " SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
+
     private static final String MAX_RECORDING = "MAX_RECORDING";
     private  String CREATE_STOCK_STATS_TABLE =
             "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , "
                     + " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY 
(STOCK_NAME ))";
+
+
     private String UPSERT = "UPSERT into %s values (?, ?, ?)";
 
+    private String TENANT_ID = "1234567890";
+
     @Before
     public void setupTables() throws Exception {
 
@@ -63,22 +72,28 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testNoConditionsOnSelect() throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
-        String stockTableName = generateUniqueName();
-        String stockStatsTableName = generateUniqueName();
-        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, 
stockTableName));
-        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, 
stockStatsTableName));
-        conn.commit();
-        final Configuration conf = getUtility().getConfiguration();
-        Job job = Job.getInstance(conf);
-        PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockTableName, null,
-                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
-        testJob(job, stockTableName, stockStatsTableName, 91.04);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createAndTestJob(conn, null, 91.04, null);
+        }
     }
 
     @Test
     public void testConditionsOnSelect() throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createAndTestJob(conn, RECORDING_YEAR + "  < 2009", 81.04, null);
+        }
+    }
+
+    @Test
+    public void testWithTenantId() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())){
+            //tenant view will perform the same filter as the select 
conditions do in testConditionsOnSelect
+            createAndTestJob(conn, null, 81.04, TENANT_ID);
+        }
+
+    }
+
+    private void createAndTestJob(Connection conn, String s, double v, String 
tenantId) throws SQLException, IOException, InterruptedException, 
ClassNotFoundException {
         String stockTableName = generateUniqueName();
         String stockStatsTableName = generateUniqueName();
         conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, 
stockTableName));
@@ -86,14 +101,33 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         conn.commit();
         final Configuration conf = getUtility().getConfiguration();
         Job job = Job.getInstance(conf);
-        PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockTableName, RECORDING_YEAR+"  < 2009",
+        if (tenantId != null) {
+            setInputForTenant(job, tenantId, stockTableName, s);
+
+        } else {
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockTableName, s,
                 STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
-        testJob(job, stockTableName, stockStatsTableName, 81.04);
+        }
+        testJob(conn, job, stockTableName, stockStatsTableName, v);
+
+    }
+
+    private void setInputForTenant(Job job, String tenantId, String 
stockTableName, String s) throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT_ID);
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), 
props)){
+            PhoenixMapReduceUtil.setTenantId(job, tenantId);
+            String stockViewName = generateUniqueName();
+            
tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, 
stockViewName, stockTableName));
+            tenantConn.commit();
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockViewName, s,
+                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        }
     }
 
-    private void testJob(Job job, String stockTableName, String 
stockStatsTableName, double expectedMax)
+    private void testJob(Connection conn, Job job, String stockTableName, 
String stockStatsTableName, double expectedMax)
             throws SQLException, InterruptedException, IOException, 
ClassNotFoundException {
-        upsertData(stockTableName);
+        upsertData(conn, stockTableName);
 
         // only run locally, rather than having to spin up a MiniMapReduce 
cluster and lets us use breakpoints
         job.getConfiguration().set("mapreduce.framework.name", "local");
@@ -135,8 +169,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         job.setOutputFormatClass(PhoenixOutputFormat.class);
     }
 
-    private void upsertData(String stockTableName) throws SQLException {
-        Connection conn = DriverManager.getConnection(getUrl());
+    private void upsertData(Connection conn, String stockTableName) throws 
SQLException {
         PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
stockTableName));
         upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
         upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd34ae79/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 9f16cc1..6093edd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -169,32 +169,39 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
         try {
             final String txnScnValue = 
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
             final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            final String tenantId = 
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
             final Properties overridingProps = new Properties();
             if(txnScnValue==null && currentScnValue!=null) {
                 overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
             }
-            final Connection connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
-            final String selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
-            Preconditions.checkNotNull(selectStatement);
-            final Statement statement = connection.createStatement();
-            final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
-            // Optimize the query plan so that we potentially use secondary 
indexes            
-            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-            // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the max time range is set by BaseScannerRegionObserver 
-            if (txnScnValue!=null) {
-                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
+            if (tenantId != null && 
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+                overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
+            try (final Connection connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
+                 final Statement statement = connection.createStatement()) {
+
+              final String selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
+              Preconditions.checkNotNull(selectStatement);
+
+              final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
+              // Optimize the query plan so that we potentially use secondary 
indexes
+              final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+              final Scan scan = queryPlan.getContext().getScan();
+              // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the max time range is set by BaseScannerRegionObserver
+              if (txnScnValue != null) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
+              }
 
-            // setting the snapshot configuration
-            String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
-            if (snapshotName != null)
+              // setting the snapshot configuration
+              String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+              if (snapshotName != null)
                 
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
                     getQueryServices().getConfiguration(), snapshotName);
 
-            // Initialize the query plan so it sets up the parallel scans
-            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
-            return queryPlan;
+              // Initialize the query plan so it sets up the parallel scans
+              queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+              return queryPlan;
+            }
         } catch (Exception exception) {
             LOG.error(String.format("Failed to get the query plan with error 
[%s]",
                 exception.getMessage()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd34ae79/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 3c27f65..f3f0415 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -26,6 +26,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -147,6 +148,8 @@ public final class PhoenixConfigurationUtil {
 
     public static final String RESTORE_DIR_KEY = 
"phoenix.tableSnapshot.restore.dir";
 
+    public static final String MAPREDUCE_TENANT_ID = 
"phoenix.mapreduce.tenantid";
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -343,7 +346,12 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getInputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = 
ConnectionUtil.getInputConnection(configuration);
+        Properties props = new Properties();
+        String tenantId = 
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        final Connection connection = 
ConnectionUtil.getInputConnection(configuration, props);
         final List<String> selectColumnList = 
getSelectColumnList(configuration);
         columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, 
tableName, selectColumnList);
         // we put the encoded column infos in the Configuration for re 
usability.
@@ -658,4 +666,9 @@ public final class PhoenixConfigurationUtil {
            return conn.getQueryServices().getConfiguration()
                    .getBoolean(USE_STATS_FOR_PARALLELIZATION, 
DEFAULT_USE_STATS_FOR_PARALLELIZATION);
        }
+
+    public static void setTenantId(Configuration configuration, String 
tenantId){
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_TENANT_ID, tenantId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd34ae79/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index b0981ef..3462177 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -147,7 +147,6 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
-     * @param outputClass
      * @param tableName  Output table
      * @param columns    List of columns separated by ,
      */
@@ -162,7 +161,6 @@ public final class PhoenixMapReduceUtil {
     /**
      *
      * @param job
-     * @param outputClass
      * @param tableName  Output table
      * @param fieldNames fields
      */
@@ -183,5 +181,8 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
     }
 
+    public static void setTenantId(final Job job, final String tenantId) {
+        PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
+    }
 
 }

Reply via email to