PHOENIX-2674 PhoenixMapReduceUtil#setInput doesn't honor condition clause

Setting the condition in the PhoenixMapReduceUtil,
as well as some slight cleanup for duplicate code
in setInput(). Adding a test that covers mapreduce
with and without a condition.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ece81b5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ece81b5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ece81b5

Branch: refs/heads/calcite
Commit: 8ece81b5522df3e6bd9dfdb3112e101215bb49f1
Parents: 0c1fd3a
Author: Jesse Yates <jya...@apache.org>
Authored: Wed Feb 10 12:46:47 2016 -0800
Committer: Jesse Yates <jya...@apache.org>
Committed: Fri Feb 12 12:15:42 2016 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/MapReduceIT.java | 230 +++++++++++++++++++
 .../mapreduce/util/PhoenixMapReduceUtil.java    |  65 +++---
 2 files changed, 264 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
new file mode 100644
index 0000000..f030701
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PhoenixArray;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that our MapReduce basic tools work as expected
+ */
+public class MapReduceIT extends BaseHBaseManagedTimeIT {
+
+    private static final String STOCK_TABLE_NAME = "stock";
+    private static final String STOCK_STATS_TABLE_NAME = "stock_stats";
+    private static final String STOCK_NAME = "STOCK_NAME";
+    private static final String RECORDING_YEAR = "RECORDING_YEAR";
+    private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+    private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT 
EXISTS " + STOCK_TABLE_NAME + " ( " +
+            STOCK_NAME + " VARCHAR NOT NULL ," + RECORDING_YEAR + " INTEGER 
NOT  NULL, " + RECORDINGS_QUARTER +
+            " DOUBLE array[] CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + " , " 
+ RECORDING_YEAR + "))";
+
+    private static final String MAX_RECORDING = "MAX_RECORDING";
+    private static final String CREATE_STOCK_STATS_TABLE =
+            "CREATE TABLE IF NOT EXISTS " + STOCK_STATS_TABLE_NAME + "(" + 
STOCK_NAME + " VARCHAR NOT NULL , "
+                    + MAX_RECORDING + " DOUBLE CONSTRAINT pk PRIMARY KEY (" + 
STOCK_NAME + "))";
+    private static final String UPSERT = "UPSERT into " + STOCK_TABLE_NAME + " 
values (?, ?, ?)";
+
+    @Before
+    public void setupTables() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(CREATE_STOCK_TABLE);
+        conn.createStatement().execute(CREATE_STOCK_STATS_TABLE);
+        conn.commit();
+    }
+
+    @Test
+    public void testNoConditionsOnSelect() throws Exception {
+        final Configuration conf = getUtility().getConfiguration();
+        Job job = Job.getInstance(conf);
+        PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
STOCK_TABLE_NAME, null,
+                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        testJob(job, 91.04);
+    }
+
+    @Test
+    public void testConditionsOnSelect() throws Exception {
+        final Configuration conf = getUtility().getConfiguration();
+        Job job = Job.getInstance(conf);
+        PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
STOCK_TABLE_NAME, RECORDING_YEAR+"  < 2009",
+                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        testJob(job, 81.04);
+    }
+
+    private void testJob(Job job, double expectedMax)
+            throws SQLException, InterruptedException, IOException, 
ClassNotFoundException {
+        upsertData();
+
+        // only run locally, rather than having to spin up a MiniMapReduce 
cluster and lets us use breakpoints
+        job.getConfiguration().set("mapreduce.framework.name", "local");
+        setOutput(job);
+
+        job.setMapperClass(StockMapper.class);
+        job.setReducerClass(StockReducer.class);
+        job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(DoubleWritable.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(StockWritable.class);
+
+        // run job
+        assertTrue("Job didn't complete successfully! Check logs for reason.", 
job.waitForCompletion(true));
+
+        // verify
+        ResultSet stats = 
DriverManager.getConnection(getUrl()).createStatement()
+                .executeQuery("SELECT * FROM " + STOCK_STATS_TABLE_NAME);
+        assertTrue("No data stored in stats table!", stats.next());
+        String name = stats.getString(1);
+        double max = stats.getDouble(2);
+        assertEquals("Got the wrong stock name!", "AAPL", name);
+        assertEquals("Max value didn't match the expected!", expectedMax, max, 
0);
+        assertFalse("Should only have stored one row in stats table!", 
stats.next());
+    }
+
+    /**
+     * Custom output setting because output upsert statement setting is broken 
(PHOENIX-2677)
+     *
+     * @param job to update
+     */
+    private void setOutput(Job job) {
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setOutputTableName(configuration, 
STOCK_STATS_TABLE_NAME);
+        configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, "UPSERT 
into " + STOCK_STATS_TABLE_NAME +
+                " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
+        job.setOutputFormatClass(PhoenixOutputFormat.class);
+    }
+
+    private void upsertData() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(UPSERT);
+        upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
+        upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+        conn.commit();
+    }
+
+    private void upsertData(PreparedStatement stmt, String name, int year, 
Double[] data) throws SQLException {
+        int i = 1;
+        stmt.setString(i++, name);
+        stmt.setInt(i++, year);
+        Array recordings = new 
PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, data);
+        stmt.setArray(i++, recordings);
+        stmt.execute();
+    }
+
+    public static class StockWritable implements DBWritable {
+
+        private String stockName;
+        private double[] recordings;
+        private double maxPrice;
+
+        @Override
+        public void readFields(ResultSet rs) throws SQLException {
+            stockName = rs.getString(STOCK_NAME);
+            final Array recordingsArray = rs.getArray(RECORDINGS_QUARTER);
+            recordings = (double[]) recordingsArray.getArray();
+        }
+
+        @Override
+        public void write(PreparedStatement pstmt) throws SQLException {
+            pstmt.setString(1, stockName);
+            pstmt.setDouble(2, maxPrice);
+        }
+
+        public double[] getRecordings() {
+            return recordings;
+        }
+
+        public String getStockName() {
+            return stockName;
+        }
+
+        public void setStockName(String stockName) {
+            this.stockName = stockName;
+        }
+
+        public void setMaxPrice(double maxPrice) {
+            this.maxPrice = maxPrice;
+        }
+    }
+
+    /**
+     * Extract the max price for each stock recording
+     */
+    public static class StockMapper extends Mapper<NullWritable, 
StockWritable, Text, DoubleWritable> {
+
+        private Text stock = new Text();
+        private DoubleWritable price = new DoubleWritable();
+
+        @Override
+        protected void map(NullWritable key, StockWritable stockWritable, 
Context context)
+                throws IOException, InterruptedException {
+            double[] recordings = stockWritable.getRecordings();
+            final String stockName = stockWritable.getStockName();
+            double maxPrice = Double.MIN_VALUE;
+            for (double recording : recordings) {
+                if (maxPrice < recording) {
+                    maxPrice = recording;
+                }
+            }
+            stock.set(stockName);
+            price.set(maxPrice);
+            context.write(stock, price);
+        }
+    }
+
+    /**
+     * Store the max price seen for each stock
+     */
+    public static class StockReducer extends Reducer<Text, DoubleWritable, 
NullWritable, StockWritable> {
+
+        @Override
+        protected void reduce(Text key, Iterable<DoubleWritable> recordings, 
Context context)
+                throws IOException, InterruptedException {
+            double maxPrice = Double.MIN_VALUE;
+            for (DoubleWritable recording : recordings) {
+                if (maxPrice < recording.get()) {
+                    maxPrice = recording.get();
+                }
+            }
+            final StockWritable stock = new StockWritable();
+            stock.setStockName(key.toString());
+            stock.setMaxPrice(maxPrice);
+            context.write(NullWritable.get(), stock);
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index f52c860..125c6a8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -30,43 +30,46 @@ import 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 public final class PhoenixMapReduceUtil {
 
     private PhoenixMapReduceUtil() {
-        
+
     }
-    
+
     /**
-     * 
+     *
      * @param job
      * @param inputClass DBWritable class
      * @param tableName  Input table name
-     * @param conditions Condition clause to be added to the WHERE clause.
+     * @param conditions Condition clause to be added to the WHERE clause. Can 
be <tt>null</tt> if there are no conditions.
      * @param fieldNames fields being projected for the SELECT query.
      */
-    public static void setInput(final Job job, final Class<? extends 
DBWritable> inputClass, final String tableName , final String conditions, final 
String... fieldNames) {
-          job.setInputFormatClass(PhoenixInputFormat.class);
-          final Configuration configuration = job.getConfiguration();
-          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
-          
PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
-          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
-          PhoenixConfigurationUtil.setSchemaType(configuration, 
SchemaType.TABLE);
+    public static void setInput(final Job job, final Class<? extends 
DBWritable> inputClass, final String tableName,
+                                final String conditions, final String... 
fieldNames) {
+        final Configuration configuration = setInput(job, inputClass, 
tableName);
+        if(conditions != null) {
+            PhoenixConfigurationUtil.setInputTableConditions(configuration, 
conditions);
+        }
+        PhoenixConfigurationUtil.setSelectColumnNames(configuration, 
fieldNames);
     }
-       
+
     /**
-     * 
-     * @param job         
-     * @param inputClass  DBWritable class  
+     *
+     * @param job
+     * @param inputClass  DBWritable class
      * @param tableName   Input table name
      * @param inputQuery  Select query.
      */
     public static void setInput(final Job job, final Class<? extends 
DBWritable> inputClass, final String tableName, final String inputQuery) {
-          job.setInputFormatClass(PhoenixInputFormat.class);
-          final Configuration configuration = job.getConfiguration();
-          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
-          PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
-          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+          final Configuration configuration = setInput(job, inputClass, 
tableName);
           PhoenixConfigurationUtil.setSchemaType(configuration, 
SchemaType.QUERY);
-          
      }
-    
+
+    private static Configuration setInput(final Job job, final Class<? extends 
DBWritable> inputClass, final String tableName){
+        job.setInputFormatClass(PhoenixInputFormat.class);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+        PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+        return configuration;
+    }
+
     /**
      * A method to override which HBase cluster for {@link PhoenixInputFormat} 
to read from
      * @param job MapReduce Job
@@ -77,10 +80,10 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setInputCluster(configuration, quorum);
     }
     /**
-     * 
+     *
      * @param job
-     * @param outputClass  
-     * @param tableName  Output table 
+     * @param outputClass
+     * @param tableName  Output table
      * @param columns    List of columns separated by ,
      */
     public static void setOutput(final Job job, final String tableName,final 
String columns) {
@@ -89,13 +92,13 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
         
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
     }
-    
-    
+
+
     /**
-     * 
+     *
      * @param job
      * @param outputClass
-     * @param tableName  Output table 
+     * @param tableName  Output table
      * @param fieldNames fields
      */
     public static void setOutput(final Job job, final String tableName , final 
String... fieldNames) {
@@ -104,7 +107,7 @@ public final class PhoenixMapReduceUtil {
           PhoenixConfigurationUtil.setOutputTableName(configuration, 
tableName);
           
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
     }
-    
+
     /**
      * A method to override which HBase cluster for {@link 
PhoenixOutputFormat} to write to
      * @param job MapReduce Job
@@ -115,5 +118,5 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
     }
 
-    
+
 }

Reply via email to