Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 e9347fc62 -> ec235e44d


PHOENIX-2926 Skip loading data for table having local indexes when there is 
split during bulkload job(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ec235e44
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ec235e44
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ec235e44

Branch: refs/heads/4.x-HBase-0.98
Commit: ec235e44dfacc0c24edf5e4c70251d6df4e99eaa
Parents: e9347fc
Author: Rajeshbabu Chintaguntla <[email protected]>
Authored: Wed Jul 6 20:27:12 2016 +0530
Committer: Rajeshbabu Chintaguntla <[email protected]>
Committed: Wed Jul 6 20:27:12 2016 +0530

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 55 ++++++++++++++++++--
 1 file changed, 52 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec235e44/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ad1b691..faa0a6e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -57,6 +58,7 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -234,6 +236,15 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         List<TargetTableRef> tablesToBeLoaded = new 
ArrayList<TargetTableRef>();
         PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
         tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, 
table.getPhysicalName().getString()));
+        boolean hasLocalIndexes = false;
+        for(PTable index: table.getIndexes()) {
+            if (index.getIndexType() == IndexType.LOCAL) {
+                hasLocalIndexes =
+                        qualifiedIndexTableName == null ? true : 
index.getTableName().getString()
+                                .equals(qualifiedIndexTableName);
+                if (hasLocalIndexes) break;
+            }
+        }
         // using conn after it's been closed... o.O
         tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, 
qualifiedTableName));
 
@@ -254,7 +265,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
             tablesToBeLoaded.add(targetIndexRef);
         }
 
-        return submitJob(conf, tableName, inputPaths, outputPath, 
tablesToBeLoaded);
+        return submitJob(conf, tableName, inputPaths, outputPath, 
tablesToBeLoaded, hasLocalIndexes);
     }
 
     /**
@@ -263,7 +274,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
      * @throws Exception 
      */
     public int submitJob(final Configuration conf, final String 
qualifiedTableName,
-        final String inputPaths, final Path outputPath, List<TargetTableRef> 
tablesToBeLoaded) throws Exception {
+        final String inputPaths, final Path outputPath, List<TargetTableRef> 
tablesToBeLoaded, boolean hasLocalIndexes) throws Exception {
        
         Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + 
qualifiedTableName);
         FileInputFormat.addInputPaths(job, inputPaths);
@@ -275,7 +286,16 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         job.setOutputKeyClass(TableRowkeyPair.class);
         job.setOutputValueClass(KeyValue.class);
         job.setReducerClass(FormatToKeyValueReducer.class);
-
+        byte[][] splitKeysBeforeJob = null;
+        HTable table = null;
+        if(hasLocalIndexes) {
+            try{
+                table = new HTable(job.getConfiguration(), qualifiedTableName);
+                splitKeysBeforeJob = table.getRegionLocator().getStartKeys();
+            } finally {
+                if(table != null )table.close();
+            }
+        }
         MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
 
         final String tableNamesAsJson = 
TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
@@ -291,6 +311,35 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         boolean success = job.waitForCompletion(true);
 
         if (success) {
+            if (hasLocalIndexes) {
+                byte[][] splitKeysAfterJob = null;
+                try {
+                    table = new HTable(job.getConfiguration(), 
qualifiedTableName);
+                    splitKeysAfterJob = 
table.getRegionLocator().getStartKeys();
+                } finally {
+                    if (table != null) table.close();
+                }
+                boolean matchingSplitKeys = true;
+                if (splitKeysBeforeJob != null && splitKeysAfterJob != null
+                        && splitKeysBeforeJob.length == 
splitKeysAfterJob.length) {
+                    for (int i = 0; i < splitKeysBeforeJob.length; i++) {
+                        if (Bytes.compareTo(splitKeysBeforeJob[i], 
splitKeysAfterJob[i]) != 0) {
+                            matchingSplitKeys = false;
+                            break;
+                        }
+                    }
+                } else {
+                    matchingSplitKeys = false;
+                }
+                if(!matchingSplitKeys) {
+                    LOG.error("The table "
+                            + qualifiedTableName
+                            + " has local indexes and there is split key 
mismatch before and"
+                            + " after running bulkload job. Please rerun the 
job otherwise"
+                            + " there may be inconsistencies between actual 
data and index data.");
+                    return -1;
+                }
+            }
             LOG.info("Loading HFiles from {}", outputPath);
             completebulkload(conf,outputPath,tablesToBeLoaded);
             LOG.info("Removing output directory {}", outputPath);

Reply via email to