Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 ad03c4b89 -> 02cc2d0fd
PHOENIX-2926 Skip loading data for table having local indexes when there is split during bulkload job-addendem(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/02cc2d0f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/02cc2d0f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/02cc2d0f Branch: refs/heads/4.x-HBase-1.1 Commit: 02cc2d0fd926c54b2f7e6fb4f0e7dcb2fb3718f6 Parents: ad03c4b Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Thu Jul 28 21:45:37 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Thu Jul 28 21:45:37 2016 +0530 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 32 ++++++-------------- .../phoenix/mapreduce/index/IndexTool.java | 21 +++++++++++-- .../java/org/apache/phoenix/util/IndexUtil.java | 13 ++++++++ 3 files changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/02cc2d0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index faa0a6e..b32f9c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce; +import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -60,6 +61,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -312,33 +314,19 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { if (success) { if (hasLocalIndexes) { - byte[][] splitKeysAfterJob = null; try { table = new HTable(job.getConfiguration(), qualifiedTableName); - splitKeysAfterJob = table.getRegionLocator().getStartKeys(); + if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getRegionLocator().getStartKeys())) { + LOG.error("The table " + + qualifiedTableName + + " has local indexes and there is split key mismatch before and" + + " after running bulkload job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data."); + return -1; + } } finally { if (table != null) table.close(); } - boolean matchingSplitKeys = true; - if (splitKeysBeforeJob != null && splitKeysAfterJob != null - && splitKeysBeforeJob.length == splitKeysAfterJob.length) { - for (int i = 0; i < splitKeysBeforeJob.length; i++) { - if (Bytes.compareTo(splitKeysBeforeJob[i], splitKeysAfterJob[i]) != 0) { - matchingSplitKeys = false; - break; - } - } - } else { - matchingSplitKeys = false; - } - if(!matchingSplitKeys) { - LOG.error("The table " - + qualifiedTableName - + " has local indexes and there is split key mismatch before and" - + " after running bulkload job. Please rerun the job otherwise" - + " there may be inconsistencies between actual data and index data."); - return -1; - } } LOG.info("Loading HFiles from {}", outputPath); completebulkload(conf,outputPath,tablesToBeLoaded); http://git-wip-us.apache.org/repos/asf/phoenix/blob/02cc2d0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 34c9013..82b353c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -63,6 +63,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -205,8 +206,10 @@ public class IndexTool extends Configured implements Tool { // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is // computed from the qDataTable name. String physicalIndexTable = pindexTable.getPhysicalName().getString(); + boolean isLocalIndexBuild = false; if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { physicalIndexTable = qDataTable; + isLocalIndexBuild = true; } final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); @@ -247,7 +250,7 @@ public class IndexTool extends Configured implements Tool { configureSubmittableJobUsingDirectApi(job, outputPath, cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())); } else { - configureRunnableJobUsingBulkLoad(job, outputPath); + configureRunnableJobUsingBulkLoad(job, outputPath, isLocalIndexBuild); // Without direct API, we need to update the index state to ACTIVE from client. IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE); @@ -276,7 +279,7 @@ public class IndexTool extends Configured implements Tool { * @return * @throws Exception */ - private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception { + private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, boolean isLocalIndexBuild) throws Exception { job.setMapperClass(PhoenixIndexImportMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); @@ -285,11 +288,25 @@ public class IndexTool extends Configured implements Tool { PhoenixConfigurationUtil.getPhysicalTableName(configuration); final HTable htable = new HTable(configuration, physicalIndexTable); HFileOutputFormat.configureIncrementalLoad(job, htable); + byte[][] splitKeysBeforeJob = null; + if(isLocalIndexBuild) { + splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); + } boolean status = job.waitForCompletion(true); if (!status) { LOG.error("IndexTool job failed!"); htable.close(); throw new Exception("IndexTool job failed: " + job.toString()); + } else { + if (isLocalIndexBuild + && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator() + .getStartKeys())) { + String errMsg = "The index to build is local index and the split keys are not matching" + + " before and after running the job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data"; + LOG.error(errMsg); + throw new Exception(errMsg); + } } LOG.info("Loading HFiles from {}", outputPath); http://git-wip-us.apache.org/repos/asf/phoenix/blob/02cc2d0f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index b0abe36..9089b68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -673,4 +673,17 @@ public class IndexUtil { HConstants.NO_NONCE, HConstants.NO_NONCE); } + public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2) throws IOException { + if (splitKeys1 != null && splitKeys2 != null + && splitKeys1.length == splitKeys2.length) { + for (int i = 0; i < splitKeys1.length; i++) { + if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) { + return false; + } + } + } else { + return false; + } + return true; + } }
