Repository: phoenix Updated Branches: refs/heads/master dbd9143e9 -> d4be4f206
PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d4be4f20 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d4be4f20 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d4be4f20 Branch: refs/heads/master Commit: d4be4f20637b2438eb402198f7ddbeef56198e69 Parents: dbd9143 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Wed Jul 6 20:23:35 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Wed Jul 6 20:23:35 2016 +0530 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 55 ++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4be4f20/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index ad1b691..faa0a6e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; @@ -57,6 +58,7 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRef; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -234,6 +236,15 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>(); PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString())); + boolean hasLocalIndexes = false; + for(PTable index: table.getIndexes()) { + if (index.getIndexType() == IndexType.LOCAL) { + hasLocalIndexes = + qualifiedIndexTableName == null ? true : index.getTableName().getString() + .equals(qualifiedIndexTableName); + if (hasLocalIndexes) break; + } + } // using conn after it's been closed... o.O tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); @@ -254,7 +265,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { tablesToBeLoaded.add(targetIndexRef); } - return submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded); + return submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded, hasLocalIndexes); } /** @@ -263,7 +274,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { * @throws Exception */ public int submitJob(final Configuration conf, final String qualifiedTableName, - final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception { + final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded, boolean hasLocalIndexes) throws Exception { Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName); FileInputFormat.addInputPaths(job, inputPaths); @@ -275,7 +286,16 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { job.setOutputKeyClass(TableRowkeyPair.class); job.setOutputValueClass(KeyValue.class); job.setReducerClass(FormatToKeyValueReducer.class); - + byte[][] splitKeysBeforeJob = null; + HTable table = null; + if(hasLocalIndexes) { + try{ + table = new HTable(job.getConfiguration(), qualifiedTableName); + splitKeysBeforeJob = table.getRegionLocator().getStartKeys(); + } finally { + if(table != null )table.close(); + } + } MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); @@ -291,6 +311,35 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { boolean success = job.waitForCompletion(true); if (success) { + if (hasLocalIndexes) { + byte[][] splitKeysAfterJob = null; + try { + table = new HTable(job.getConfiguration(), qualifiedTableName); + splitKeysAfterJob = table.getRegionLocator().getStartKeys(); + } finally { + if (table != null) table.close(); + } + boolean matchingSplitKeys = true; + if (splitKeysBeforeJob != null && splitKeysAfterJob != null + && splitKeysBeforeJob.length == splitKeysAfterJob.length) { + for (int i = 0; i < splitKeysBeforeJob.length; i++) { + if (Bytes.compareTo(splitKeysBeforeJob[i], splitKeysAfterJob[i]) != 0) { + matchingSplitKeys = false; + break; + } + } + } else { + matchingSplitKeys = false; + } + if(!matchingSplitKeys) { + LOG.error("The table " + + qualifiedTableName + + " has local indexes and there is split key mismatch before and" + + " after running bulkload job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data."); + return -1; + } + } LOG.info("Loading HFiles from {}", outputPath); completebulkload(conf,outputPath,tablesToBeLoaded); LOG.info("Removing output directory {}", outputPath);
