PHOENIX-1390 Stats not updated on client after major compaction
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a8a023a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a8a023a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a8a023a Branch: refs/heads/4.2 Commit: 7a8a023a3d2bdb694f02fc1560a0f5eb35294a96 Parents: 851f57a Author: James Taylor <[email protected]> Authored: Tue Oct 28 12:44:37 2014 -0700 Committer: James Taylor <[email protected]> Committed: Tue Oct 28 12:44:37 2014 -0700 ---------------------------------------------------------------------- .../end2end/BaseClientManagedTimeIT.java | 15 +++- .../org/apache/phoenix/end2end/BaseQueryIT.java | 3 +- .../end2end/ClientTimeArithmeticQueryIT.java | 43 +++++++++++ .../phoenix/end2end/InMemoryOrderByIT.java | 4 +- .../org/apache/phoenix/end2end/QueryIT.java | 24 ++++-- .../apache/phoenix/end2end/ReverseScanIT.java | 2 +- .../org/apache/phoenix/end2end/SequenceIT.java | 7 +- .../phoenix/end2end/SpooledOrderByIT.java | 4 +- .../phoenix/end2end/StatsCollectorIT.java | 55 +++++++++++++- .../apache/phoenix/end2end/UpsertSelectIT.java | 4 +- .../phoenix/compile/ExpressionCompiler.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 35 ++------- .../UngroupedAggregateRegionObserver.java | 21 ++++-- .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 5 +- .../apache/phoenix/schema/MetaDataClient.java | 4 +- .../org/apache/phoenix/schema/PTableImpl.java | 2 +- .../phoenix/schema/stats/PTableStats.java | 7 ++ .../phoenix/schema/stats/PTableStatsImpl.java | 12 ++- .../schema/stats/StatisticsCollector.java | 79 ++++++++++++-------- .../phoenix/schema/stats/StatisticsScanner.java | 1 - .../phoenix/schema/stats/StatisticsUtil.java | 6 +- .../phoenix/schema/stats/StatisticsWriter.java | 39 ++++++---- 23 files changed, 259 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java index 14dffcb..1acd5b3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java @@ -17,16 +17,21 @@ */ package org.apache.phoenix.end2end; +import java.util.Map; + import javax.annotation.concurrent.NotThreadSafe; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + /** * Base class for tests that manage their own time stamps * We need to separate these from tests that rely on hbase to set @@ -54,9 +59,17 @@ public abstract class BaseClientManagedTimeIT extends BaseTest { deletePriorTables(ts - 1, getUrl()); } + public static Map<String,String> getDefaultProps() { + Map<String,String> props = Maps.newHashMapWithExpectedSize(5); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, Boolean.FALSE.toString()); + return props; + } + @BeforeClass public static void doSetup() throws Exception { - setUpTestDriver(ReadOnlyProps.EMPTY_PROPS); + Map<String,String> props = getDefaultProps(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @AfterClass http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java index 7a3e86e..f3031f4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java @@ -48,7 +48,6 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; @@ -70,7 +69,7 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(5); + Map<String,String> props = getDefaultProps(); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); // Make a small batch size to test multiple calls to reserve sequences http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java index 98b233c..d709b9c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java @@ -49,6 +49,7 @@ import java.util.Properties; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -596,5 +597,47 @@ public class ClientTimeArithmeticQueryIT extends BaseQueryIT { } } + @Test + public void testDateDateSubtract() throws Exception { + String url; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15); + Connection conn = DriverManager.getConnection(url, props); + PreparedStatement statement = conn.prepareStatement("UPSERT INTO ATABLE(organization_id,entity_id,a_time) VALUES(?,?,?)"); + statement.setString(1, getOrganizationId()); + statement.setString(2, ROW2); + statement.setDate(3, date); + statement.execute(); + statement.setString(2, ROW3); + statement.setDate(3, date); + statement.execute(); + statement.setString(2, ROW4); + statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1)); + statement.execute(); + statement.setString(2, ROW6); + statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1)); + statement.execute(); + statement.setString(2, ROW9); + statement.setDate(3, date); + statement.execute(); + conn.commit(); + conn.close(); + + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 25); + conn = DriverManager.getConnection(url, props); + try { + statement = conn.prepareStatement("SELECT entity_id, b_string FROM ATABLE WHERE a_date - a_time > 1"); + ResultSet rs = statement.executeQuery(); + @SuppressWarnings("unchecked") + List<List<Object>> expectedResults = Lists.newArrayList( + Arrays.<Object>asList(ROW3, E_VALUE), + Arrays.<Object>asList( ROW6, E_VALUE), + Arrays.<Object>asList(ROW9, E_VALUE)); + assertValuesEqualsResultSet(rs, expectedResults); + } finally { + conn.close(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java index 48a0581..533143c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java @@ -24,8 +24,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import com.google.common.collect.Maps; - @Category(ClientManagedTimeTest.class) public class InMemoryOrderByIT extends OrderByIT { @@ -35,7 +33,7 @@ public class InMemoryOrderByIT extends OrderByIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + Map<String,String> props = getDefaultProps(); props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1024*1024)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java index f45b689..fe65e10 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java @@ -250,7 +250,7 @@ public class QueryIT extends BaseQueryIT { @Test public void testPointInTimeScan() throws Exception { // Override value that was set at creation time - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); // Run query at timestamp 5 + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection upsertConn = DriverManager.getConnection(url, props); String upsertStmt = @@ -267,13 +267,15 @@ public class QueryIT extends BaseQueryIT { stmt.setString(2, ROW4); stmt.setInt(3, 5); stmt.execute(); // should commit too - Connection conn1 = DriverManager.getConnection(getUrl(), props); + + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15); + Connection conn1 = DriverManager.getConnection(url, props); analyzeTable(conn1, "ATABLE"); conn1.close(); upsertConn.close(); // Override value again, but should be ignored since it's past the SCN - url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 3); // Run query at timestamp 5 + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 30); upsertConn = DriverManager.getConnection(url, props); upsertConn.setAutoCommit(true); // Test auto commit // Insert all rows at ts @@ -285,7 +287,7 @@ public class QueryIT extends BaseQueryIT { upsertConn.close(); String query = "SELECT organization_id, a_string AS a FROM atable WHERE organization_id=? and a_integer = 5"; - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20)); Connection conn = DriverManager.getConnection(getUrl(), props); PreparedStatement statement = conn.prepareStatement(query); statement.setString(1, tenantId); @@ -394,7 +396,7 @@ public class QueryIT extends BaseQueryIT { " A_TIMESTAMP) " + "VALUES (?, ?, ?)"; // Override value that was set at creation time - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection upsertConn = DriverManager.getConnection(url, props); upsertConn.setAutoCommit(true); // Test auto commit @@ -405,9 +407,12 @@ public class QueryIT extends BaseQueryIT { byte[] ts1 = PDataType.TIMESTAMP.toBytes(tsValue1); stmt.setTimestamp(3, tsValue1); stmt.execute(); - Connection conn1 = DriverManager.getConnection(getUrl(), props); + + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15); + Connection conn1 = DriverManager.getConnection(url, props); analyzeTable(conn1, "ATABLE"); conn1.close(); + updateStmt = "upsert into " + "ATABLE(" + @@ -426,15 +431,18 @@ public class QueryIT extends BaseQueryIT { stmt.setTime(4, new Time(tsValue2.getTime())); stmt.execute(); upsertConn.close(); - conn1 = DriverManager.getConnection(getUrl(), props); + + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 20); + conn1 = DriverManager.getConnection(url, props); analyzeTable(conn1, "ATABLE"); conn1.close(); + analyzeTable(upsertConn, "ATABLE"); assertTrue(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts2), new ImmutableBytesWritable(ts1))); assertFalse(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts1), new ImmutableBytesWritable(ts1))); String query = "SELECT entity_id, a_timestamp, a_time FROM aTable WHERE organization_id=? and a_timestamp > ?"; - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 3)); // Execute at timestamp 2 + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30)); // Execute at timestamp 2 Connection conn = DriverManager.getConnection(getUrl(), props); try { PreparedStatement statement = conn.prepareStatement(query); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java index 26d6d4b..e279710 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java @@ -50,7 +50,7 @@ import com.google.common.collect.Maps; @Category(HBaseManagedTimeTest.class) public class ReverseScanIT extends BaseHBaseManagedTimeIT { @BeforeClass - @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(1); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java index 4f2b9a9..b4b0b2e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java @@ -51,7 +51,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; @Category(ClientManagedTimeTest.class) public class SequenceIT extends BaseClientManagedTimeIT { @@ -63,11 +62,9 @@ public class SequenceIT extends BaseClientManagedTimeIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { - - Map<String,String> props = Maps.newHashMapWithExpectedSize(1); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE)); + Map<String,String> props = getDefaultProps(); // Must update config before starting server + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java index 2533a29..c35ecab 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java @@ -24,15 +24,13 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import com.google.common.collect.Maps; - @Category(ClientManagedTimeTest.class) public class SpooledOrderByIT extends OrderByIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + Map<String,String> props = getDefaultProps(); props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(100)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index 51ad543..b9a0e88 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -18,6 +18,8 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.getAllSplits; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -27,9 +29,15 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -41,7 +49,8 @@ import com.google.common.collect.Maps; @Category(NeedsOwnMiniClusterTest.class) public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { - + private static final String STATS_TEST_TABLE_NAME = "S"; + @BeforeClass public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); @@ -222,4 +231,48 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { return stmt; } + private void compactTable(Connection conn) throws IOException, InterruptedException, SQLException { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HBaseAdmin admin = services.getAdmin(); + try { + admin.flush(STATS_TEST_TABLE_NAME); + admin.majorCompact(STATS_TEST_TABLE_NAME); + Thread.sleep(10000); // FIXME: how do we know when compaction is done? + } finally { + admin.close(); + } + services.clearCache(); + } + + @Test + public void testCompactUpdatesStats() throws Exception { + int nRows = 10; + Connection conn; + PreparedStatement stmt; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + STATS_TEST_TABLE_NAME + "(k CHAR(1) PRIMARY KEY, v INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE); + stmt = conn.prepareStatement("UPSERT INTO " + STATS_TEST_TABLE_NAME + " VALUES(?,?)"); + for (int i = 0; i < nRows; i++) { + stmt.setString(1, Character.toString((char) ('a' + i))); + stmt.setInt(2, i); + stmt.executeUpdate(); + } + conn.commit(); + + compactTable(conn); + conn = DriverManager.getConnection(getUrl(), props); + List<KeyRange>keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME); + assertEquals(nRows+1, keyRanges.size()); + + int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + STATS_TEST_TABLE_NAME + " WHERE V < 5"); + conn.commit(); + assertEquals(5, nDeletedRows); + + compactTable(conn); + + keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME); + assertEquals(nRows/2+1, keyRanges.size()); + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index 642ba62..ac54fe4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -55,8 +55,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Maps; - @Category(ClientManagedTimeTest.class) public class UpsertSelectIT extends BaseClientManagedTimeIT { @@ -64,7 +62,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(5); + Map<String,String> props = getDefaultProps(); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(500)); props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java index e06a88f..3876b8a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java @@ -891,7 +891,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio if (isType1Date || isType2Date) { if (isType1Date && isType2Date) { i = 2; - theType = PDataType.LONG; + theType = PDataType.DECIMAL; } else if (isType1Date && type2 != null && type2.isCoercibleTo(PDataType.DECIMAL)) { i = 2; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index b90fb2e..3abd206 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -608,6 +608,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso try { statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp); + timeStamp = Math.max(timeStamp, stats.getTimestamp()); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?"); } finally { @@ -1264,32 +1265,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private PTable incrementTableTimestamp(byte[] key, long clientTimeStamp) throws IOException, SQLException { - HRegion region = env.getRegion(); - RowLock lid = region.getRowLock(key); - if (lid == null) { - throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); - } - try { - PTable table = doGetTable(key, clientTimeStamp, lid); - if (table != null) { - long tableTimeStamp = table.getTimeStamp() + 1; - List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1); - Put p = new Put(key); - p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, tableTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY); - mutations.add(p); - region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet()); - - Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(env).getMetaDataCache(); - ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); - metaDataCache.invalidate(cacheKey); - } - return table; - } finally { - lid.release(); - } - } - private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException { return doGetTable(key, clientTimeStamp, null); } @@ -1711,9 +1686,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tableName = request.getTableName().toByteArray(); try { byte[] tenantId = request.getTenantId().toByteArray(); - long clientTimeStamp = request.getClientTimestamp(); - byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName); - incrementTableTimestamp(tableKey, clientTimeStamp); + byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName); + ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); + Cache<ImmutableBytesPtr, PTable> metaDataCache = + GlobalCache.getInstance(this.env).getMetaDataCache(); + metaDataCache.invalidate(cacheKey); } catch (Throwable t) { logger.error("incrementTableTimeStamp failed", t); ProtobufUtil.setControllerException(controller, http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 710409f..aba35fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -72,6 +72,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.PColumn; @@ -459,9 +460,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) && scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { - // TODO: for users that manage timestamps themselves, we should provide - // a means of specifying/getting this. - long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + boolean useCurrentTime = + c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + // Provides a means of clients controlling their timestamps to not use current time + // when background tasks are updating stats. Instead we track the max timestamp of + // the cells and use that. + long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), clientTimeStamp); internalScan = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s); @@ -485,9 +490,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { StatisticsCollector stats = null; try { - // TODO: for users that manage timestamps themselves, we should provide - // a means of specifying/getting this. - long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + boolean useCurrentTime = + e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + // Provides a means of clients controlling their timestamps to not use current time + // when background tasks are updating stats. Instead we track the max timestamp of + // the cells and use that. + long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp); stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 7f000c0..72002ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -136,6 +136,7 @@ public interface QueryServices extends SQLCloseable { public static final String MIN_STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.minUpdateFrequency"; public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region"; + public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 7ee225b..7c8ecd4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -52,6 +52,7 @@ import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; @@ -146,7 +147,8 @@ public class QueryServicesOptions { public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05; public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min - public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20; + public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB + public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -175,6 +177,7 @@ public class QueryServicesOptions { public static QueryServicesOptions withDefaults() { Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); QueryServicesOptions options = new QueryServicesOptions(config) + .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME) .setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS) .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE) .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE) http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index afe21e8..b763bbb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -613,13 +613,13 @@ public class MetaDataClient { Long scn = connection.getSCN(); // Always invalidate the cache long clientTimeStamp = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn; - String query = "SELECT CURRENT_DATE() - " + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + String query = "SELECT CURRENT_DATE()," + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY + " IS NULL AND " + REGION_NAME + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL"; ResultSet rs = connection.createStatement().executeQuery(query); long msSinceLastUpdate = Long.MAX_VALUE; if (rs.next()) { - msSinceLastUpdate = rs.getLong(1); + msSinceLastUpdate = rs.getLong(1) - rs.getLong(2); } if (msSinceLastUpdate < msMinBetweenUpdates) { return 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 2448f39..8f85ccc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -886,7 +886,7 @@ public class PTableImpl implements PTable { GuidePostsInfo info = new GuidePostsInfo(pTableStatsProto.getGuidePostsByteCount(), value); tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info); } - PTableStats stats = new PTableStatsImpl(tableGuidePosts); + PTableStats stats = new PTableStatsImpl(tableGuidePosts, timeStamp); PName dataTableName = null; if (table.hasDataTableNameBytes()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java index 3745487..435fe87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java @@ -37,6 +37,11 @@ public interface PTableStats { public int getEstimatedSize() { return 0; } + + @Override + public long getTimestamp() { + return StatisticsCollector.NO_TIMESTAMP; + } }; /** @@ -47,4 +52,6 @@ public interface PTableStats { SortedMap<byte[], GuidePostsInfo> getGuidePosts(); int getEstimatedSize(); + + long getTimestamp(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java index dcf7b00..dc70e86 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java @@ -23,6 +23,7 @@ import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.util.SizedUtil; import com.sun.istack.NotNull; @@ -33,13 +34,15 @@ import com.sun.istack.NotNull; public class PTableStatsImpl implements PTableStats { private final SortedMap<byte[], GuidePostsInfo> guidePosts; private final int estimatedSize; + private final long timeStamp; public PTableStatsImpl() { - this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR)); + this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR), MetaDataProtocol.MIN_TABLE_TIMESTAMP); } - public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts) { + public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts, long timeStamp) { this.guidePosts = guidePosts; + this.timeStamp = timeStamp; int estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.INT_SIZE + SizedUtil.sizeOfTreeMap(guidePosts.size()); for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) { byte[] cf = entry.getKey(); @@ -84,4 +87,9 @@ public class PTableStatsImpl implements PTableStats { public int getEstimatedSize() { return estimatedSize; } + + @Override + public long getTimestamp() { + return timeStamp; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 53bd18a..3511d12 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -23,8 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -45,12 +43,15 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.TimeKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -64,33 +65,45 @@ import com.google.common.collect.Maps; * board for now. */ public class StatisticsCollector { + private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class); + public static final long NO_TIMESTAMP = -1; private Map<String, byte[]> minMap = Maps.newHashMap(); private Map<String, byte[]> maxMap = Maps.newHashMap(); private long guidepostDepth; + private boolean useCurrentTime; + private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap(); // Tracks the bytecount per family if it has reached the guidePostsDepth private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap(); protected StatisticsWriter statsTable; - // Ensures that either analyze or compaction happens at any point of time. - private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException { Configuration config = env.getConfiguration(); HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)); - long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize(); - if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set... - maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE; + useCurrentTime = + config.getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0); + if (guidepostPerRegion > 0) { + long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize(); + if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set... + maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE; + } + guidepostDepth = maxFileSize / guidepostPerRegion; + } else { + guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); } - guidepostDepth = - config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - maxFileSize / config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_GUIDE_POSTS_PER_REGION)); // Get the stats table associated with the current table on which the CP is // triggered this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp); } + public long getMaxTimeStamp() { + return maxTimeStamp; + } + public void close() throws IOException { this.statsTable.close(); } @@ -99,12 +112,12 @@ public class StatisticsCollector { try { ArrayList<Mutation> mutations = new ArrayList<Mutation>(); writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); - if (LOG.isDebugEnabled()) { - LOG.debug("Committing new stats for the region " + region.getRegionInfo()); + if (logger.isDebugEnabled()) { + logger.debug("Committing new stats for the region " + region.getRegionInfo()); } commitStats(mutations); } catch (IOException e) { - LOG.error(e); + logger.error("Unable to commit new stats", e); } finally { clear(); } @@ -116,20 +129,20 @@ public class StatisticsCollector { // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { if (delete) { - if(LOG.isDebugEnabled()) { - LOG.debug("Deleting the stats for the region "+region.getRegionInfo()); + if(logger.isDebugEnabled()) { + logger.debug("Deleting the stats for the region "+region.getRegionInfo()); } statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()), mutations); } - if(LOG.isDebugEnabled()) { - LOG.debug("Adding new stats for the region "+region.getRegionInfo()); + if(logger.isDebugEnabled()) { + logger.debug("Adding new stats for the region "+region.getRegionInfo()); } statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()), mutations); } } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); + logger.error("Failed to update statistics table!", e); throw e; } } @@ -147,7 +160,7 @@ public class StatisticsCollector { mutations); } } catch (IOException e) { - LOG.error("Failed to delete from statistics table!", e); + logger.error("Failed to delete from statistics table!", e); throw e; } } @@ -195,8 +208,8 @@ public class StatisticsCollector { internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, smallestReadPoint, earliestPutTs); } - if (LOG.isDebugEnabled()) { - LOG.debug("Compaction scanner created for stats"); + if (logger.isDebugEnabled()) { + logger.debug("Compaction scanner created for stats"); } InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName()); if (scanner != null) { @@ -212,22 +225,22 @@ public class StatisticsCollector { // Create a delete operation on the parent region // Then write the new guide posts for individual regions List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); - long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + long currentTime = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : -1; deleteStatsFromStatsTable(region, mutations, currentTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo()); + if (logger.isDebugEnabled()) { + logger.debug("Collecting stats for the daughter region " + l.getRegionInfo()); } collectStatsForSplitRegions(conf, l, mutations, currentTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo()); + if (logger.isDebugEnabled()) { + logger.debug("Collecting stats for the daughter region " + r.getRegionInfo()); } collectStatsForSplitRegions(conf, r, mutations, currentTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); + if (logger.isDebugEnabled()) { + logger.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); } commitStats(mutations); } catch (IOException e) { - LOG.error("Error while capturing stats after split of region " + logger.error("Error while capturing stats after split of region " + region.getRegionInfo().getRegionNameAsString(), e); } } @@ -244,13 +257,13 @@ public class StatisticsCollector { count = scanRegion(scanner, count); writeStatsToStatsTable(daughter, false, mutations, currentTime); } catch (IOException e) { - LOG.error(e); + logger.error("Unable to collects stats during split", e); toThrow = e; } finally { try { if (scanner != null) scanner.close(); } catch (IOException e) { - LOG.error(e); + logger.error("Unable to close scanner after split", e); if (toThrow != null) toThrow = e; } finally { if (toThrow != null) throw toThrow; @@ -278,6 +291,7 @@ public class StatisticsCollector { this.minMap.clear(); this.guidePostsMap.clear(); this.familyMap.clear(); + maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; } public void updateStatistic(KeyValue kv) { @@ -302,6 +316,7 @@ public class StatisticsCollector { maxMap.put(fam, row); } } + maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); // TODO : This can be moved to an interface so that we could collect guide posts in different ways Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam); if (gps == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 60b9601..3a84cfc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -35,7 +35,6 @@ public class StatisticsScanner implements InternalScanner { public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region, InternalScanner delegate, byte[] family) { - // should there be only one tracker? this.tracker = tracker; this.stats = stats; this.delegate = delegate; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index b8d64bd..eb183e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -72,6 +72,7 @@ public class StatisticsUtil { s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); ResultScanner scanner = statsHTable.getScanner(s); Result result = null; + long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR); while ((result = scanner.next()) != null) { CellScanner cellScanner = result.cellScanner(); @@ -88,10 +89,13 @@ public class StatisticsUtil { if (oldInfo != null) { newInfo.combine(oldInfo); } + if (current.getTimestamp() > timeStamp) { + timeStamp = current.getTimestamp(); + } } } if (!guidePostsPerCf.isEmpty()) { - return new PTableStatsImpl(guidePostsPerCf); + return new PTableStatsImpl(guidePostsPerCf, timeStamp); } return PTableStats.EMPTY_STATS; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 6da135e..22f0ead 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -62,7 +62,9 @@ public class StatisticsWriter implements Closeable { clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); } StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp); - statsTable.commitLastStatsUpdatedTime(); + if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet + statsTable.commitLastStatsUpdatedTime(); + } return statsTable; } @@ -101,26 +103,31 @@ public class StatisticsWriter implements Closeable { */ public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { if (tracker == null) { return; } - + boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP; + long timeStamp = clientTimeStamp; + if (useMaxTimeStamp) { // When using max timestamp, we write the update time later because we only know the ts now + timeStamp = tracker.getMaxTimeStamp(); + mutations.add(getLastStatsUpdatedTimePut(timeStamp)); + } byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); Put put = new Put(prefix); GuidePostsInfo gp = tracker.getGuidePosts(fam); if (gp != null) { put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES, - clientTimeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size()))); + timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size()))); put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, - clientTimeStamp, PDataType.VARBINARY.toBytes(gp.toBytes())); + timeStamp, PDataType.VARBINARY.toBytes(gp.toBytes())); put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES, - clientTimeStamp, PDataType.LONG.toBytes(gp.getByteCount())); + timeStamp, PDataType.LONG.toBytes(gp.getByteCount())); } put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, - clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); + timeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, - clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + timeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); // Add our empty column value so queries behave correctly put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, - clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY); + timeStamp, ByteUtil.EMPTY_BYTE_ARRAY); mutations.add(put); } @@ -153,21 +160,27 @@ public class StatisticsWriter implements Closeable { } } - private void commitLastStatsUpdatedTime() throws IOException { - // Always use wallclock time for this, as it's a mechanism to prevent - // stats from being collected too often. + private Put getLastStatsUpdatedTimePut(long timeStamp) { long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); byte[] prefix = tableName; Put put = new Put(prefix); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp, + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, timeStamp, PDataType.DATE.toBytes(new Date(currentTime))); + return put; + } + + private void commitLastStatsUpdatedTime() throws IOException { + // Always use wallclock time for this, as it's a mechanism to prevent + // stats from being collected too often. + Put put = getLastStatsUpdatedTimePut(clientTimeStamp); statisticsTable.put(put); } public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { + long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp; byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); - mutations.add(new Delete(prefix, clientTimeStamp - 1)); + mutations.add(new Delete(prefix, timeStamp - 1)); } } \ No newline at end of file
