Repository: ambari Updated Branches: refs/heads/branch-2.2 33f600f00 -> f1894bd76
AMBARI-15361. Fix ordering of Alter table calls which could result in Region Close issue. Futher improvements. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f1894bd7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f1894bd7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f1894bd7 Branch: refs/heads/branch-2.2 Commit: f1894bd762b12848553136250ccca5645b3211ac Parents: 33f600f Author: Siddharth Wagle <[email protected]> Authored: Wed Mar 9 16:33:40 2016 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Wed Mar 9 16:33:40 2016 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 4 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 145 +++++++------------ .../timeline/ITPhoenixHBaseAccessor.java | 29 +++- 3 files changed, 75 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f1894bd7/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 465fe95..2f080e3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -86,9 +86,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf); metricMetadataManager.initializeMetadata(); // Initialize policies before TTL update - hBaseAccessor.initPolicies(); - // Alter TTL on tables - hBaseAccessor.alterMetricTableTTL(); + hBaseAccessor.initPoliciesAndTTL(); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); http://git-wip-us.apache.org/repos/asf/ambari/blob/f1894bd7/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 48be4ee..611cb92 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.RetryCounter; @@ -46,7 +47,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.phoenix.exception.PhoenixIOException; -import org.apache.phoenix.exception.SQLExceptionCode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -83,7 +83,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; @@ -148,14 +147,7 @@ public class PhoenixHBaseAccessor { static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles"; - private final String precisionTtl; - private final String hostMinTtl; - private final String hostHourTtl; - private final String hostDailyTtl; - private final String clusterSecTtl; - private final String clusterMinTtl; - private final String clusterHourTtl; - private final String clusterDailyTtl; + private HashMap<String, String> tableTTL = new HashMap<>(); public PhoenixHBaseAccessor(Configuration hbaseConf, Configuration metricsConf){ @@ -181,14 +173,14 @@ public class PhoenixHBaseAccessor { this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE); - precisionTtl = getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1")); //1 day - hostMinTtl = getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7")); //7 days - hostHourTtl = getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30")); //30 days - hostDailyTtl = getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365")); //1 year - clusterSecTtl = getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7")); //7 days - clusterMinTtl = getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30")); //30 days - clusterHourTtl = getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365")); //1 year - clusterDailyTtl = getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730")); //2 years + tableTTL.put(METRICS_RECORD_TABLE_NAME, getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1"))); //1 day + tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7"))); //7 days + tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30"))); //30 days + tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365"))); //1 year + tableTTL.put(METRICS_CLUSTER_AGGREGATE_TABLE_NAME, getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7"))); //7 days + tableTTL.put(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30"))); //30 days + tableTTL.put(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365"))); //1 year + tableTTL.put(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730"))); //2 years } private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) @@ -252,73 +244,6 @@ public class PhoenixHBaseAccessor { return dataSource.getHBaseAdmin(); } - /** - * Set TTL on tables based on user settings - */ - protected void alterMetricTableTTL() { - Connection conn = null; - Statement stmt = null; - - try { - LOG.info("Initializing metrics schema..."); - conn = getConnectionRetryingOnException(); - stmt = conn.createStatement(); - - //alter TTL options to update tables - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_RECORD_TABLE_NAME, - precisionTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_MINUTE_TABLE_NAME, - hostMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_HOURLY_TABLE_NAME, - hostHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_DAILY_TABLE_NAME, - hostDailyTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME, - clusterSecTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, - clusterMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, - clusterHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, - clusterDailyTtl)); - - conn.commit(); - - - } catch (InterruptedException e) { - LOG.warn("Error updating TTL on tables.", e); - } catch (SQLException sql) { - if (sql.getErrorCode() == SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode()) { - LOG.warn("Update TTL on tables is unsupported by the phoenix version. " + sql.getMessage()); - } else { - LOG.warn("Error updating TTL on tables.", sql); - } - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - // Ignore - } - } - } - } - protected void initMetricSchema() { Connection conn = null; Statement stmt = null; @@ -342,33 +267,47 @@ public class PhoenixHBaseAccessor { // Host level String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, - encoding, precisionTtl, compression); + encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression); String splitPoints = metricsConf.get(PRECISION_TABLE_SPLIT_POINTS); if (!StringUtils.isEmpty(splitPoints)) { precisionSql += getSplitPointsStr(splitPoints); } stmt.executeUpdate(precisionSql); stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, - METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, hostMinTtl, compression)); + METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, + tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME), + compression)); stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, - METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, hostHourTtl, compression)); + METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, + tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME), + compression)); stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, - METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, hostDailyTtl, compression)); + METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, + tableTTL.get(METRICS_AGGREGATE_DAILY_TABLE_NAME), + compression)); // Cluster level String aggregateSql = String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, clusterMinTtl, compression); + METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, + tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME), + compression); splitPoints = metricsConf.get(AGGREGATE_TABLE_SPLIT_POINTS); if (!StringUtils.isEmpty(splitPoints)) { aggregateSql += getSplitPointsStr(splitPoints); } stmt.executeUpdate(aggregateSql); stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, - METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, clusterHourTtl, compression)); + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, + tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME), + compression)); stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression)); + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, + tableTTL.get(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME), + compression)); stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, - METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression)); + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, + tableTTL.get(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME), + compression)); conn.commit(); @@ -396,7 +335,7 @@ public class PhoenixHBaseAccessor { } } - protected void initPolicies() { + protected void initPoliciesAndTTL() { boolean enableNormalizer = hbaseConf.getBoolean("hbase.normalizer.enabled", true); boolean enableFifoCompaction = metricsConf.getBoolean("timeline.metrics.hbase.fifo.compaction.enabled", true); @@ -449,6 +388,20 @@ public class PhoenixHBaseAccessor { " = " + 300 + " for " + tableName); modifyTable = true; } + // Change TTL setting to match user configuration + HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); + if (columnFamilies != null) { + for (HColumnDescriptor family : columnFamilies) { + String ttlValue = family.getValue("TTL"); + if (StringUtils.isEmpty(ttlValue) || + !ttlValue.trim().equals(tableTTL.get(tableName))) { + family.setValue("TTL", tableTTL.get(tableName)); + LOG.info("Setting TTL on table: " + tableName + " to : " + + tableTTL.get(tableName) + " seconds."); + modifyTable = true; + } + } + } // Persist only if anything changed if (modifyTable) { @@ -1383,8 +1336,8 @@ public class PhoenixHBaseAccessor { return metadataMap; } - private String getDaysInSeconds(String daysString) { + String getDaysInSeconds(String daysString) { double days = Double.valueOf(daysString.trim()); - return String.valueOf((int)(days*86400)); + return String.valueOf((int) (days * 86400)); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f1894bd7/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index e3e037a..93ba770 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.metrics2.sink.timeline.Precision; @@ -38,6 +39,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -53,10 +55,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DEFAULT_COMPACTION_POLICY_CLASS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; @@ -335,9 +337,9 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { } @Test - public void testInitPolicies() throws Exception { + public void testInitPoliciesAndTTL() throws Exception { HBaseAdmin hBaseAdmin = hdb.getHBaseAdmin(); - + String precisionTtl = ""; // Verify policies are unset for (String tableName : PHOENIX_TABLES) { HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes()); @@ -345,9 +347,22 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { Assert.assertFalse("Normalizer disabled by default.", tableDescriptor.isNormalizationEnabled()); Assert.assertNull("Default compaction policy is null.", tableDescriptor.getConfigurationValue(HSTORE_COMPACTION_CLASS_KEY)); + + for (HColumnDescriptor family : tableDescriptor.getColumnFamilies()) { + if (tableName.equals(METRICS_RECORD_TABLE_NAME)) { + precisionTtl = family.getValue("TTL"); + } + } + Assert.assertEquals("Precision TTL value.", hdb.getDaysInSeconds("1"), precisionTtl); } - hdb.initPolicies(); + Field f = PhoenixHBaseAccessor.class.getDeclaredField("tableTTL"); + f.setAccessible(true); + Map<String, String> precisionValues = (Map<String, String>) f.get(hdb); + precisionValues.put(METRICS_RECORD_TABLE_NAME, hdb.getDaysInSeconds("2")); + f.set(hdb, precisionValues); + + hdb.initPoliciesAndTTL(); // Verify expected policies are set boolean normalizerEnabled = false; @@ -364,11 +379,17 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { if (!normalizerEnabled || compactionPolicy == null) { Thread.sleep(2000l); } + if (tableName.equals(METRICS_RECORD_TABLE_NAME)) { + for (HColumnDescriptor family : tableDescriptor.getColumnFamilies()) { + precisionTtl = family.getValue("TTL"); + } + } } } Assert.assertTrue("Normalizer enabled.", normalizerEnabled); Assert.assertEquals("FIFO compaction policy is set.", FIFO_COMPACTION_POLICY_CLASS, compactionPolicy); + Assert.assertEquals("Precision TTL value not changed.", hdb.getDaysInSeconds("2"), precisionTtl); hBaseAdmin.close(); }
