AMBARI-15361. Fix ordering of Alter table calls which could result in Region Close issue. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6d3e2912 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6d3e2912 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6d3e2912 Branch: refs/heads/AMBARI-13364 Commit: 6d3e2912d49a5343bfdef54a04add9210ba16657 Parents: f7711af Author: Siddharth Wagle <[email protected]> Authored: Wed Mar 9 14:00:01 2016 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Wed Mar 9 14:00:01 2016 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 7 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 134 ++++++++++++------- 2 files changed, 94 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6d3e2912/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index f460292..465fe95 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -80,12 +80,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin Configuration metricsConf) { if (!isInitialized) { hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); + // Initialize schema hBaseAccessor.initMetricSchema(); // Initialize metadata from store metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf); metricMetadataManager.initializeMetadata(); - + // Initialize policies before TTL update hBaseAccessor.initPolicies(); + // Alter TTL on tables + hBaseAccessor.alterMetricTableTTL(); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); @@ -96,7 +99,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); scheduleAggregatorThread(secondClusterAggregator, metricsConf); -// // Start the minute cluster aggregator + // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); scheduleAggregatorThread(minuteClusterAggregator, metricsConf); http://git-wip-us.apache.org/repos/asf/ambari/blob/6d3e2912/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 09da6bf..8cfe9a9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -150,6 +150,15 @@ public class PhoenixHBaseAccessor { static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles"; + private final String precisionTtl; + private final String hostMinTtl; + private final String hostHourTtl; + private final String hostDailyTtl; + private final String clusterSecTtl; + private final String clusterMinTtl; + private final String clusterHourTtl; + private final String clusterDailyTtl; + public PhoenixHBaseAccessor(Configuration hbaseConf, Configuration metricsConf){ this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); @@ -173,6 +182,15 @@ public class PhoenixHBaseAccessor { this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE); this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false); + + precisionTtl = getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1")); //1 day + hostMinTtl = getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7")); //7 days + hostHourTtl = getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30")); //30 days + hostDailyTtl = getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365")); //1 year + clusterSecTtl = getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7")); //7 days + clusterMinTtl = getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30")); //30 days + clusterHourTtl = getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365")); //1 year + clusterDailyTtl = getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730")); //2 years } private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) @@ -236,20 +254,80 @@ public class PhoenixHBaseAccessor { return dataSource.getHBaseAdmin(); } + /** + * Set TTL on tables based on user settings + */ + protected void alterMetricTableTTL() { + Connection conn = null; + Statement stmt = null; + + try { + LOG.info("Initializing metrics schema..."); + conn = getConnectionRetryingOnException(); + stmt = conn.createStatement(); + + //alter TTL options to update tables + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_RECORD_TABLE_NAME, + precisionTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_MINUTE_TABLE_NAME, + hostMinTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_HOURLY_TABLE_NAME, + hostHourTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_DAILY_TABLE_NAME, + hostDailyTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME, + clusterSecTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, + clusterMinTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, + clusterHourTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, + clusterDailyTtl)); + + conn.commit(); + + + } catch (InterruptedException e) { + LOG.warn("Error updating TTL on tables.", e); + } catch (SQLException sql) { + if (sql.getErrorCode() == SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode()) { + LOG.warn("Update TTL on tables is unsupported by the phoenix version. " + sql.getMessage()); + } else { + LOG.warn("Error updating TTL on tables.", sql); + } + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } + protected void initMetricSchema() { Connection conn = null; Statement stmt = null; String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); - String precisionTtl = getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1")); //1 day - String hostMinTtl = getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7")); //7 days - String hostHourTtl = getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30")); //30 days - String hostDailyTtl = getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365")); //1 year - String clusterSecTtl = getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7")); //7 days - String clusterMinTtl = getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30")); //30 days - String clusterHourTtl = getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365")); //1 year - String clusterDailyTtl = getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730")); //2 years + try { LOG.info("Initializing metrics schema..."); @@ -294,48 +372,14 @@ public class PhoenixHBaseAccessor { stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression)); - //alter TTL options to update tables - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_RECORD_TABLE_NAME, - precisionTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_MINUTE_TABLE_NAME, - hostMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_HOURLY_TABLE_NAME, - hostHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_DAILY_TABLE_NAME, - hostDailyTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME, - clusterSecTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, - clusterMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, - clusterHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, - clusterDailyTtl)); conn.commit(); LOG.info("Metrics schema initialized."); - } catch (SQLException sql) { - if (sql.getErrorCode() == - SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode()) { - LOG.warn("Cannot update TTL on tables. " + sql.getMessage()); - } else { - LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); - throw new MetricsSystemInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", sql); - } - } catch (InterruptedException e) { - LOG.error("Error creating Metrics Schema in HBase using Phoenix.", e); + } catch (SQLException | InterruptedException sql) { + LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); throw new MetricsSystemInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", e); + "Error creating Metrics Schema in HBase using Phoenix.", sql); } finally { if (stmt != null) { try {
