Repository: phoenix Updated Branches: refs/heads/master 59a7dd138 -> c2d33ed38
PHOENIX-4997 Phoenix MR on snapshots can produce duplicate rows Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c2d33ed3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c2d33ed3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c2d33ed3 Branch: refs/heads/master Commit: c2d33ed384467fea4655e6f49ce43834f3886409 Parents: 59a7dd1 Author: Karan Mehta <[email protected]> Authored: Thu Nov 1 17:15:26 2018 -0700 Committer: Karan Mehta <[email protected]> Committed: Thu Nov 1 17:56:25 2018 -0700 ---------------------------------------------------------------------- .../end2end/TableSnapshotReadsMapReduceIT.java | 123 +++++++++++-------- .../iterate/MapReduceParallelScanGrouper.java | 34 ++++- .../iterate/TableSnapshotResultIterator.java | 30 +++-- .../java/org/apache/phoenix/query/BaseTest.java | 12 +- 4 files changed, 126 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2d33ed3/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index fcf89a0..4aaeef2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -36,6 +36,7 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.SnapshotDescription; @@ -47,14 +48,21 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { + + private static final Logger logger = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class); + private final static String SNAPSHOT_NAME = "FOO"; private static final String FIELD1 = "FIELD1"; private static final String FIELD2 = "FIELD2"; @@ -66,6 +74,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private static List<List<Object>> result; private long timestamp; private String tableName; + private Job job; + private Path tmpDir; + private Configuration conf; @BeforeClass public static void doSetup() throws Exception { @@ -73,8 +84,8 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } - @Test - public void testMapReduceSnapshots() throws Exception { + @Before + public void before() throws SQLException, IOException { // create table Connection conn = DriverManager.getConnection(getUrl()); tableName = generateUniqueName(); @@ -82,58 +93,43 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); + conf = getUtility().getConfiguration(); + job = Job.getInstance(conf); + tmpDir = getUtility().getRandomDir(); + } - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3); + @Test + public void testMapReduceSnapshots() throws Exception { + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, null, false); + } - // configure and test job - configureJob(job, tableName, null, null); + @Test + public void testMapReduceSnapshotsMultiRegion() throws Exception { + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, null, FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, null, true); } @Test public void testMapReduceSnapshotsWithCondition() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, "FIELD3 > 0001"); - + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, "FIELD3 > 0001", false); } @Test public void testMapReduceSnapshotWithLimit() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - // Running limit with order by on non pk column String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery); - - // configure and test job - configureJob(job, tableName, inputQuery, null); + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class, + SNAPSHOT_NAME, tableName, tmpDir, inputQuery); + configureJob(job, tableName, inputQuery, null, false); } - private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception { + private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception { try { - upsertAndSnapshot(tableName); + upsertAndSnapshot(tableName, shouldSplit); result = new ArrayList<>(); job.setMapperClass(TableSnapshotMapper.class); @@ -151,6 +147,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { if (condition != null) { selectQuery.append(" WHERE " + condition); } + if (inputQuery == null) inputQuery = selectQuery.toString(); @@ -176,12 +173,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private void upsertData(String tableName) throws SQLException { Connection conn = DriverManager.getConnection(getUrl()); PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "CCCC", "SSDD", 0001); - upsertData(stmt, "CCCC", "HDHG", 0005); - upsertData(stmt, "BBBB", "JSHJ", 0002); - upsertData(stmt, "AAAA", "JHHD", 0003); + upsertData(stmt, "AAAA", "JHHD", 37); + upsertData(stmt, "BBBB", "JSHJ", 224); + upsertData(stmt, "CCCC", "SSDD", 15); + upsertData(stmt, "PPPP", "AJDG", 53); + upsertData(stmt, "SSSS", "HSDG", 59); + upsertData(stmt, "XXXX", "HDPP", 22); conn.commit(); - timestamp = System.currentTimeMillis(); } private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException { @@ -191,31 +189,52 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { stmt.execute(); } - public void upsertAndSnapshot(String tableName) throws Exception { + private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception { upsertData(tableName); + TableName hbaseTableName = TableName.valueOf(tableName); Connection conn = DriverManager.getConnection(getUrl()); Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); - // call flush to create new files in the region - admin.flush(TableName.valueOf(tableName)); + + if (shouldSplit) { + splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2); + } + + admin.snapshot(SNAPSHOT_NAME, hbaseTableName); List<SnapshotDescription> snapshots = admin.listSnapshots(); Assert.assertEquals(tableName, snapshots.get(0).getTable()); + // Capture the snapshot timestamp to use as SCN while reading the table later + // Assigning the timestamp value here will make tests less flaky + timestamp = System.currentTimeMillis(); + // upsert data after snapshot PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "DDDD", "SNFB", 0004); + upsertData(stmt, "DDDD", "SNFB", 45); conn.commit(); } - public void deleteSnapshot(String tableName) throws Exception { + private void splitTableSync(Admin admin, TableName hbaseTableName, + byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException { + admin.split(hbaseTableName, splitPoint); + for (int i = 0; i < 100; i++) { + List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName); + if (hRegionInfoList.size() >= expectedRegions) { + break; + } + logger.info("Sleeping for 1000 ms while waiting for " + hbaseTableName.getNameAsString() + " to split"); + Thread.sleep(1000); + } + } + + private void deleteSnapshot(String tableName) throws Exception { try (Connection conn = DriverManager.getConnection(getUrl()); Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { admin.deleteSnapshot(SNAPSHOT_NAME); } - } + } public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2d33ed3/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java index f25d89d..11dfb00 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java @@ -18,6 +18,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import com.google.common.base.Preconditions; @@ -28,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -46,7 +49,7 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { private static final MapReduceParallelScanGrouper INSTANCE = new MapReduceParallelScanGrouper(); - public static MapReduceParallelScanGrouper getInstance() { + public static MapReduceParallelScanGrouper getInstance() { return INSTANCE; } @@ -80,18 +83,39 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { } } + /** + * Get list of region locations from SnapshotManifest + * BaseResultIterators assume that regions are sorted using RegionInfo.COMPARATOR + */ private List<HRegionLocation> getRegionLocationsFromManifest(SnapshotManifest manifest) { List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); Preconditions.checkNotNull(regionManifests); - List<HRegionLocation> regionLocations = Lists.newArrayListWithCapacity(regionManifests.size()); + List<RegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); + List<HRegionLocation> hRegionLocations = Lists.newArrayListWithCapacity(regionManifests.size()); for (SnapshotRegionManifest regionManifest : regionManifests) { - regionLocations.add(new HRegionLocation( - ProtobufUtil.toRegionInfo(regionManifest.getRegionInfo()), null)); + RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionManifest.getRegionInfo()); + if (isValidRegion(regionInfo)) { + regionInfos.add(regionInfo); + } + } + + regionInfos.sort(RegionInfo.COMPARATOR); + + for (RegionInfo regionInfo : regionInfos) { + hRegionLocations.add(new HRegionLocation(regionInfo, null)); } - return regionLocations; + return hRegionLocations; + } + + // Exclude offline split parent regions + private boolean isValidRegion(RegionInfo hri) { + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + return false; + } + return true; } private String getSnapshotName(Configuration conf) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2d33ed3/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java index 31746ce..1d490c1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -79,20 +80,29 @@ public class TableSnapshotResultIterator implements ResultIterator { RestoreSnapshotHelper.RestoreMetaChanges meta = RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, this.rootDir, this.restoreDir, this.snapshotName); - List restoredRegions = meta.getRegionsToAdd(); + List<RegionInfo> restoredRegions = meta.getRegionsToAdd(); this.htd = meta.getTableDescriptor(); - this.regions = new ArrayList(restoredRegions.size()); - Iterator i$ = restoredRegions.iterator(); - - while(i$.hasNext()) { - RegionInfo hri = (RegionInfo)i$.next(); - if(CellUtil.overlappingKeys(this.scan.getStartRow(), this.scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { - this.regions.add(hri); + this.regions = new ArrayList<>(restoredRegions.size()); + + for (RegionInfo restoredRegion : restoredRegions) { + if (isValidRegion(restoredRegion)) { + this.regions.add(restoredRegion); } } - Collections.sort(this.regions,RegionInfo.COMPARATOR); + this.regions.sort(RegionInfo.COMPARATOR); + } + + /** + * Exclude offline split parent regions and + * regions that don't intersect with provided scan + */ + private boolean isValidRegion(RegionInfo hri) { + if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { + return false; + } + return PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey()); } public boolean initSnapshotScanner() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2d33ed3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 844d6a1..13d2881 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -1781,15 +1781,15 @@ public abstract class BaseTest { /** - * Split SYSTEM.CATALOG at the given split point + * Synchronously split table at the given split point */ - protected static void splitRegion(byte[] splitPoint) throws SQLException, IOException, InterruptedException { + protected static void splitRegion(TableName fullTableName, byte[] splitPoint) throws SQLException, IOException, InterruptedException { Admin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); - admin.split(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME, splitPoint); + admin.split(fullTableName, splitPoint); // make sure the split finishes (there's no synchronous splitting before HBase 2.x) - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME); - admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME); + admin.disableTable(fullTableName); + admin.enableTable(fullTableName); } /** @@ -1818,7 +1818,7 @@ public abstract class BaseTest { AssignmentManager am = master.getAssignmentManager(); // No need to split on the first splitPoint since the end key of region boundaries are exclusive for (int i=1; i<splitPoints.size(); ++i) { - splitRegion(splitPoints.get(i)); + splitRegion(fullTableName, splitPoints.get(i)); } HashMap<ServerName, List<HRegionInfo>> serverToRegionsList = Maps.newHashMapWithExpectedSize(NUM_SLAVES_BASE); Deque<ServerName> availableRegionServers = new ArrayDeque<ServerName>(NUM_SLAVES_BASE);
