PHOENIX-2292 Improve performance of direct HBase API index build (Ravi Kishore 
Valeti)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8958431b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8958431b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8958431b

Branch: refs/heads/txn
Commit: 8958431b379e76d14b30d1641803eab5c89957db
Parents: 2e848f6
Author: Thomas D'Silva <[email protected]>
Authored: Thu Oct 15 15:37:13 2015 -0700
Committer: Thomas D'Silva <[email protected]>
Committed: Thu Oct 15 15:41:08 2015 -0700

----------------------------------------------------------------------
 .../mapreduce/index/DirectHTableWriter.java     | 10 +--
 .../index/PhoenixIndexImportDirectMapper.java   | 92 +++++++++++++++-----
 2 files changed, 73 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8958431b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index c9512c2..d18fde9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -18,14 +18,13 @@
 package org.apache.phoenix.mapreduce.index;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,10 +94,9 @@ public class DirectHTableWriter {
         }
     }
 
-    public void write(Mutation mutation) throws IOException {
-        if (mutation instanceof Put) this.table.put(new Put((Put) mutation));
-        else if (mutation instanceof Delete) this.table.delete(new 
Delete((Delete) mutation));
-        else throw new IOException("Pass a Delete or a Put");
+    public void write(List<Mutation> mutations) throws IOException, 
InterruptedException {
+        Object[] results = new Object[mutations.size()];
+        table.batch(mutations, results);
     }
 
     protected Configuration getConf() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8958431b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index addbcae..32b66f1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -33,10 +33,14 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.PhoenixJobCounters;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
@@ -60,6 +64,10 @@ public class PhoenixIndexImportDirectMapper extends
 
     private DirectHTableWriter writer;
 
+    private int batchSize;
+
+    private MutationState mutationState;
+
     @Override
     protected void setup(final Context context) throws IOException, 
InterruptedException {
         super.setup(context);
@@ -68,8 +76,7 @@ public class PhoenixIndexImportDirectMapper extends
 
         try {
             indxTblColumnMetadata =
-                    PhoenixConfigurationUtil
-                            .getUpsertColumnMetadataList(configuration);
+                    
PhoenixConfigurationUtil.getUpsertColumnMetadataList(configuration);
             indxWritable.setColumnMetadata(indxTblColumnMetadata);
 
             final Properties overrideProps = new Properties();
@@ -77,6 +84,14 @@ public class PhoenixIndexImportDirectMapper extends
                 configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
             connection = ConnectionUtil.getOutputConnection(configuration, 
overrideProps);
             connection.setAutoCommit(false);
+            // Get BatchSize
+            ConnectionQueryServices services = ((PhoenixConnection) 
connection).getQueryServices();
+            int maxSize =
+                    
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                        QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            batchSize = Math.min(((PhoenixConnection) 
connection).getMutateBatchSize(), maxSize);
+            LOG.info("Mutation Batch Size = " + batchSize);
+
             final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
             this.pStatement = connection.prepareStatement(upsertQuery);
 
@@ -98,17 +113,22 @@ public class PhoenixIndexImportDirectMapper extends
             this.pStatement.execute();
 
             final PhoenixConnection pconn = 
connection.unwrap(PhoenixConnection.class);
-            final Iterator<Pair<byte[], List<Mutation>>> iterator =
-                    pconn.getMutationState().toMutations(true);
+            MutationState currentMutationState = pconn.getMutationState();
+            if (mutationState == null) {
+                mutationState = currentMutationState;
+                return;
+            }
+            // Keep accumulating Mutations till batch size
+            mutationState.join(currentMutationState);
 
-            while (iterator.hasNext()) {
-                Pair<byte[], List<Mutation>> mutationPair = iterator.next();
-                for (Mutation mutation : mutationPair.getSecond()) {
-                    writer.write(mutation);
-                }
-                
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+            // Write Mutation Batch
+            if 
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 
0) {
+                writeBatch(mutationState, context);
+                mutationState = null;
             }
-            connection.rollback();
+
+            // Make sure progress is reported to Application Master.
+            context.progress();
         } catch (SQLException e) {
             LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
@@ -116,21 +136,47 @@ public class PhoenixIndexImportDirectMapper extends
         }
     }
 
+    private void writeBatch(MutationState mutationState, Context context) 
throws IOException,
+            SQLException, InterruptedException {
+        final Iterator<Pair<byte[], List<Mutation>>> iterator = 
mutationState.toMutations(true);
+        while (iterator.hasNext()) {
+            Pair<byte[], List<Mutation>> mutationPair = iterator.next();
+
+            writer.write(mutationPair.getSecond());
+            context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(
+                mutationPair.getSecond().size());
+        }
+        connection.rollback();
+    }
+
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        // We are writing some dummy key-value as map output here so that we 
commit only one
-        // output to reducer.
-        context.write(new 
ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
-            new IntWritable(0));
-        super.cleanup(context);
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
-                    e.getMessage());
+        try {
+            // Write the last & final Mutation Batch
+            if (mutationState != null) {
+                writeBatch(mutationState, context);
+            }
+            // We are writing some dummy key-value as map output here so that 
we commit only one
+            // output to reducer.
+            context.write(new 
ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+                new IntWritable(0));
+            super.cleanup(context);
+        } catch (SQLException e) {
+            LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
+                        e.getMessage());
+                }
+            }
+            if (writer != null) {
+                writer.close();
             }
         }
-        writer.close();
     }
 }

Reply via email to