Revert "PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT" This reverts commit b33131d9157f90301dd791c88ca89d2041d37c61.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a4dd52d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a4dd52d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a4dd52d Branch: refs/heads/4.x-HBase-1.2 Commit: 6a4dd52dccd6a27f05f936d410ccd306a6079227 Parents: d2d8d9a Author: Samarth Jain <[email protected]> Authored: Sat Sep 2 15:46:27 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Sat Sep 2 15:46:27 2017 -0700 ---------------------------------------------------------------------- .../end2end/TableSnapshotReadsMapReduceIT.java | 402 +++++++++---------- 1 file changed, 182 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a4dd52d/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index 591f028..4cc2a20 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -18,25 +18,11 @@ package org.apache.phoenix.end2end; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.io.NullWritable; @@ -46,215 +32,191 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; -import org.apache.phoenix.util.EnvironmentEdge; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT { - private final static String SNAPSHOT_NAME = "FOO"; - private static final String FIELD1 = "FIELD1"; - private static final String FIELD2 = "FIELD2"; - private static final String FIELD3 = "FIELD3"; - private String CREATE_TABLE = - "CREATE TABLE IF NOT EXISTS %s ( " - + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; - private String UPSERT = "UPSERT into %s values (?, ?, ?)"; - - private static List<List<Object>> result; - private String tableName; - private MyClock clock; - - @Before - public void injectMyClock() { - clock = new MyClock(1000); - // Use our own clock to prevent race between partial rebuilder and compaction - EnvironmentEdgeManager.injectEdge(clock); - } +import org.junit.*; - @After - public void removeMyClock() { - EnvironmentEdgeManager.injectEdge(null); - } - - @Test - public void testMapReduceSnapshots() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); - - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, - tmpDir, null, FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, null); - } - - @Test - public void testMapReduceSnapshotsWithCondition() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, - tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, "FIELD3 > 0001"); - - } - - @Test - public void testMapReduceSnapshotWithLimit() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); - // Running limit with order by on non pk column - String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, - tmpDir, inputQuery); - - // configure and test job - configureJob(job, tableName, inputQuery, null); - } - - private void configureJob(Job job, String tableName, String inputQuery, String condition) - throws Exception { - try { - upsertAndSnapshot(tableName); - result = new ArrayList<>(); - - job.setMapperClass(TableSnapshotMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(NullOutputFormat.class); - - Assert.assertTrue(job.waitForCompletion(true)); - - // verify the result, should match the values at the corresponding timestamp - Properties props = new Properties(); - props.setProperty("CurrentSCN", Long.toString(clock.time)); - StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName); - if (condition != null) { - selectQuery.append(" WHERE " + condition); - } - if (inputQuery == null) inputQuery = selectQuery.toString(); - - ResultSet rs = - DriverManager.getConnection(getUrl(), props).createStatement() - .executeQuery(inputQuery); - - for (List<Object> r : result) { - assertTrue("No data stored in the table!", rs.next()); - int i = 0; - String field1 = rs.getString(i + 1); - assertEquals("Got the incorrect value for field1", r.get(i++), field1); - String field2 = rs.getString(i + 1); - assertEquals("Got the incorrect value for field2", r.get(i++), field2); - int field3 = rs.getInt(i + 1); - assertEquals("Got the incorrect value for field3", r.get(i++), field3); - } - - assertFalse( - "Should only have stored " + result.size() + "rows in the table for the timestamp!", - rs.next()); - } finally { - deleteSnapshotAndTable(tableName); - } - } - - private static class MyClock extends EnvironmentEdge { - public volatile long time; - - public MyClock(long time) { - this.time = time; - } - - @Override - public long currentTime() { - return time; - } - } - - private void upsertData(String tableName) throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "CCCC", "SSDD", 0001); - upsertData(stmt, "CCCC", "HDHG", 0005); - upsertData(stmt, "BBBB", "JSHJ", 0002); - upsertData(stmt, "AAAA", "JHHD", 0003); - conn.commit(); - } - - private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) - throws SQLException { - stmt.setString(1, field1); - stmt.setString(2, field2); - stmt.setInt(3, field3); - stmt.execute(); - } - - public void upsertAndSnapshot(String tableName) throws Exception { - clock.time += 1000; - upsertData(tableName); - - Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); - // call flush to create new files in the region - admin.flush(tableName); - - List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); - Assert.assertEquals(tableName, snapshots.get(0).getTable()); +import java.io.IOException; +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; - clock.time += 1000; - // upsert data after snapshot - PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "DDDD", "SNFB", 0004); - conn.commit(); - } +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; - public void deleteSnapshotAndTable(String tableName) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.deleteSnapshot(SNAPSHOT_NAME); +public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT { + private final static String SNAPSHOT_NAME = "FOO"; + private static final String FIELD1 = "FIELD1"; + private static final String FIELD2 = "FIELD2"; + private static final String FIELD3 = "FIELD3"; + private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; + private String UPSERT = "UPSERT into %s values (?, ?, ?)"; + + private static List<List<Object>> result; + private long timestamp; + private String tableName; + + + @Test + public void testMapReduceSnapshots() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getRandomDir(); + + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3); + + // configure and test job + configureJob(job, tableName, null, null); + } + + @Test + public void testMapReduceSnapshotsWithCondition() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getRandomDir(); + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); + + // configure and test job + configureJob(job, tableName, null, "FIELD3 > 0001"); + + } + + @Test + public void testMapReduceSnapshotWithLimit() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getRandomDir(); + // Running limit with order by on non pk column + String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; + PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery); + + // configure and test job + configureJob(job, tableName, inputQuery, null); + } + + private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception { + try { + upsertAndSnapshot(tableName); + result = new ArrayList<>(); + + job.setMapperClass(TableSnapshotMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + + // verify the result, should match the values at the corresponding timestamp + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(timestamp)); + + StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName); + if (condition != null) { + selectQuery.append(" WHERE " + condition); + } + if (inputQuery == null) + inputQuery = selectQuery.toString(); + + ResultSet rs = DriverManager.getConnection(getUrl(), props).createStatement().executeQuery(inputQuery); + + for (List<Object> r : result) { + assertTrue("No data stored in the table!", rs.next()); + int i = 0; + String field1 = rs.getString(i + 1); + assertEquals("Got the incorrect value for field1", r.get(i++), field1); + String field2 = rs.getString(i + 1); + assertEquals("Got the incorrect value for field2", r.get(i++), field2); + int field3 = rs.getInt(i + 1); + assertEquals("Got the incorrect value for field3", r.get(i++), field3); + } + + assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next()); + } finally { + deleteSnapshotAndTable(tableName); } - - public static class TableSnapshotMapper extends - Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> { - - @Override - protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) - throws IOException, InterruptedException { - final List<Object> values = record.getValues(); - result.add(values); - - // write dummy data - context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), - NullWritable.get()); - } + } + + private void upsertData(String tableName) throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "CCCC", "SSDD", 0001); + upsertData(stmt, "CCCC", "HDHG", 0005); + upsertData(stmt, "BBBB", "JSHJ", 0002); + upsertData(stmt, "AAAA", "JHHD", 0003); + conn.commit(); + timestamp = System.currentTimeMillis(); + } + + private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException { + stmt.setString(1, field1); + stmt.setString(2, field2); + stmt.setInt(3, field3); + stmt.execute(); + } + + public void upsertAndSnapshot(String tableName) throws Exception { + upsertData(tableName); + + Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); + // call flush to create new files in the region + admin.flush(tableName); + + List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); + Assert.assertEquals(tableName, snapshots.get(0).getTable()); + + // upsert data after snapshot + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "DDDD", "SNFB", 0004); + conn.commit(); + } + + public void deleteSnapshotAndTable(String tableName) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.deleteSnapshot(SNAPSHOT_NAME); + + conn.createStatement().execute("DROP TABLE " + tableName); + conn.close(); + + } + + public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> { + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + final List<Object> values = record.getValues(); + result.add(values); + + // write dummy data + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), + NullWritable.get()); } + } }
