This is an automated email from the ASF dual-hosted git repository. payert pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 0817311 AMBARI-25569 Reassess Ambari Metrics data migration (#3242) 0817311 is described below commit 081731162bb161787453423231f507ab7d3c6457 Author: Tamas Payer <35402259+pay...@users.noreply.github.com> AuthorDate: Tue Oct 20 11:03:43 2020 +0200 AMBARI-25569 Reassess Ambari Metrics data migration (#3242) --- .../upgrade/core/AbstractPhoenixMetricsCopier.java | 68 +++--- .../upgrade/core/MetricsDataMigrationLauncher.java | 232 +++++++++++---------- .../upgrade/core/PhoenixClusterMetricsCopier.java | 13 +- .../upgrade/core/PhoenixHostMetricsCopier.java | 12 +- 4 files changed, 164 insertions(+), 161 deletions(-) diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java index 3d2002b..d69f28a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java @@ -20,6 +20,7 @@ package org.apache.ambari.metrics.core.timeline.upgrade.core; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import java.io.FileWriter; import java.io.IOException; @@ -52,8 +53,8 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { @Override public void run(){ LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable)); - long startTimer = System.currentTimeMillis(); - String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME", + final long startTimer = System.currentTimeMillis(); + final String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME", getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime); runPhoenixQueryAndAddToResults(query); @@ -62,11 +63,12 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { saveMetrics(); } catch (SQLException e) { LOG.error(e); - } - long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames)); + } finally { + final long estimatedTime = System.currentTimeMillis() - startTimer; + LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames)); - saveMetricsProgress(); + saveMetricsProgress(); + } } private String getMetricNamesLikeClause() { @@ -94,32 +96,15 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { private void runPhoenixQueryAndAddToResults(String query) { LOG.debug(String.format("Running query: %s", query)); - Connection conn = null; - PreparedStatement stmt = null; - try { - conn = hBaseAccessor.getConnection(); - stmt = conn.prepareStatement(query); - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - addToResults(rs); + try (Connection conn = hBaseAccessor.getConnection(); + PreparedStatement stmt = conn.prepareStatement(query)) { + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + addToResults(rs); + } } } catch (SQLException e) { LOG.error(String.format("Exception during running phoenix query %s", query), e); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - // Ignore - } - } } } @@ -127,21 +112,23 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { * Saves processed metric names info provided file in format TABLE_NAME:METRIC_NAME */ private void saveMetricsProgress() { - if (processedMetricsFile == null) { + if (this.processedMetricsFile == null) { LOG.info("Skipping metrics progress save as the file is null"); return; } - for (String metricName : metricNames) { - try { - processedMetricsFile.append(inputTable + ":" + metricName + System.lineSeparator()); - } catch (IOException e) { - LOG.error(e); + synchronized (this.processedMetricsFile) { + for (String metricName : metricNames) { + try { + this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator()); + } catch (IOException e) { + LOG.error(e); + } } } } protected String getQueryHint(Long startTime) { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); sb.append("/*+ "); sb.append("NATIVE_TIME_RANGE("); sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY); @@ -150,6 +137,15 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { return sb.toString(); } + protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + return metricHostAggregate; + } + /** * Saves aggregated metrics to the Hbase * @throws SQLException diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java index 3a25aee..889158f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java @@ -21,10 +21,12 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import java.io.BufferedReader; import java.io.FileInputStream; @@ -34,6 +36,7 @@ import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; import java.util.Collections; @@ -44,6 +47,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; @@ -62,7 +66,7 @@ import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.M public class MetricsDataMigrationLauncher { private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class); private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L; - private static String patternPrefix = "._p_"; + private static final String PATTERN_PREFIX = "._p_"; private static final int DEFAULT_BATCH_SIZE = 5; public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>(); public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>(); @@ -83,127 +87,142 @@ public class MetricsDataMigrationLauncher { private final Set<Set<String>> metricNamesBatches; private final String processedMetricsFilePath; - private Set<String> metricNames; - private Long startTime; - private Integer batchSize; - private Integer numberOfThreads; + private final Long startTime; + private final Integer numberOfThreads; private TimelineMetricConfiguration timelineMetricConfiguration; private PhoenixHBaseAccessor hBaseAccessor; private TimelineMetricMetadataManager timelineMetricMetadataManager; private Map<String, Set<String>> processedMetrics; public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception { - this.startTime = startTime == null? DEFAULT_START_TIME : startTime; - this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads; - this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize; - this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath; + this.startTime = (startTime == null) ? DEFAULT_START_TIME : startTime; + this.numberOfThreads = (numberOfThreads == null) ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads; + this.processedMetricsFilePath = (processedMetricsFilePath == null) ? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath; initializeHbaseAccessor(); - - LOG.info("Looking for whitelisted metric names..."); - - if (whitelistedFilePath != null) { - this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath); - } else { - String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT); - metricNames = readMetricWhitelistFromFile(whitelistFile); - } - readProcessedMetricsMap(); + final Set<String> metricNames = getMetricNames(whitelistedFilePath); + LOG.info("Setting up batches..."); - this.metricNamesBatches = new HashSet<>(); + if (batchSize == null) batchSize = DEFAULT_BATCH_SIZE; + this.metricNamesBatches = new HashSet<>(batchSize); - Iterables.partition(metricNames, this.batchSize) + Iterables.partition(metricNames, batchSize) .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch))); - LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize)); + LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), batchSize)); } + private Set<String> getMetricNames(String whitelistedFilePath) throws MalformedURLException, URISyntaxException, SQLException { + if(whitelistedFilePath != null) { + LOG.info(String.format("Whitelist file %s has been provided.", whitelistedFilePath)); + LOG.info("Looking for whitelisted metric names based on the file content..."); + return readMetricWhitelistFromFile(whitelistedFilePath); + } - private void readProcessedMetricsMap() { - Map<String, Set<String>> result = new HashMap<>(); - if (!Files.exists(Paths.get(processedMetricsFilePath))) { - LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath)); - this.processedMetrics = new HashMap<>(); + final Configuration conf = this.timelineMetricConfiguration.getMetricsConf(); + if (Boolean.parseBoolean(conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_ENABLED))) { + whitelistedFilePath = conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, + TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT); + LOG.info(String.format("No whitelist file has been provided but Ambari Metrics Whitelisting is enabled. " + + "Using %s as whitelist file.", whitelistedFilePath)); + LOG.info("Looking for whitelisted metric names based on the file content..."); + return readMetricWhitelistFromFile(whitelistedFilePath); } - LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath)); - try { - try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) { - stream.forEach( line -> { - String [] lineSplit = line.split(":"); - if (!result.containsKey(lineSplit[0])) { - result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1]))); - } else { - result.get(lineSplit[0]).add(lineSplit[1]); - } - }); + + LOG.info("No whitelist file has been provided and Ambari Metrics Whitelisting is disabled."); + LOG.info("Looking for all the metric names in the Metrics Database..."); + return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream() + .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet()); + } + + private void readProcessedMetricsMap() { + final Map<String, Set<String>> result = new HashMap<>(); + final Path path = Paths.get(this.processedMetricsFilePath); + + if (Files.notExists(path)) { + LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", this.processedMetricsFilePath)); + } else { + LOG.info(String.format("Reading the list of already copied metrics from %s", this.processedMetricsFilePath)); + try { + try (Stream<String> stream = Files.lines(path)) { + stream.forEach(line -> { + String[] lineSplit = line.split(":"); + if (!result.containsKey(lineSplit[0])) { + result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1]))); + } else { + result.get(lineSplit[0]).add(lineSplit[1]); + } + }); + } + } catch (IOException e) { + LOG.error(e); } - } catch (IOException e) { - LOG.error(e); } this.processedMetrics = result; } public void runMigration(Long timeoutInMinutes) throws IOException { - - FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true); - LOG.info("Setting up copiers..."); - Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>(); - for (Set<String> batch : metricNamesBatches) { - for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) { - Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); - if (!filteredMetrics.isEmpty()) { - copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor, - filteredMetrics, startTime, processedMetricsFileWriter)); + try (FileWriter processedMetricsFileWriter = new FileWriter(this.processedMetricsFilePath, true)) { + LOG.info("Setting up copiers..."); + Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>(); + for (Set<String> batch : metricNamesBatches) { + for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) { + Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); + if (!filteredMetrics.isEmpty()) { + copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, + filteredMetrics, this.startTime, processedMetricsFileWriter)); + } } - } - for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) { - Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey()); - if (!filteredMetrics.isEmpty()) { - copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor, - filteredMetrics, startTime, processedMetricsFileWriter)); + for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) { + Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); + if (!filteredMetrics.isEmpty()) { + copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, + filteredMetrics, this.startTime, processedMetricsFileWriter)); + } } } - } - if (copiers.isEmpty()) { - LOG.info("No copy threads to run, looks like all metrics have been copied."); - processedMetricsFileWriter.close(); - return; - } - - LOG.info("Running the copy threads..."); - long startTimer = System.currentTimeMillis(); - ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads); - for (AbstractPhoenixMetricsCopier copier : copiers) { - executorService.submit(copier); - } + if (copiers.isEmpty()) { + LOG.info("No copy threads to run, looks like all metrics have been copied."); + return; + } - executorService.shutdown(); + LOG.info("Running the copy threads..."); + long startTimer = System.currentTimeMillis(); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(this.numberOfThreads); + for (AbstractPhoenixMetricsCopier copier : copiers) { + executorService.submit(copier); + } + } finally { + if (executorService != null) { + executorService.shutdown(); + try { + executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } - try { - executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.error(e); + long estimatedTime = System.currentTimeMillis() - startTimer; + LOG.info(String.format("Copying took %s seconds", estimatedTime / 1000.0)); } - - long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0)); - - processedMetricsFileWriter.close(); } private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException { this.hBaseAccessor = new PhoenixHBaseAccessor(null); this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance(); - timelineMetricConfiguration.initialize(); + this.timelineMetricConfiguration.initialize(); - timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); - timelineMetricMetadataManager.initializeMetadata(false); + this.timelineMetricMetadataManager = new TimelineMetricMetadataManager(this.hBaseAccessor); + this.timelineMetricMetadataManager.initializeMetadata(false); - hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager); + this.hBaseAccessor.setMetadataInstance(this.timelineMetricMetadataManager); } private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) { @@ -219,21 +238,18 @@ public class MetricsDataMigrationLauncher { */ private static Set<String> readMetricWhitelistFromFile(String whitelistFile) { LOG.info(String.format("Reading metric names from %s", whitelistFile)); - Set<String> whitelistedMetrics = new HashSet<>(); + final Set<String> whitelistedMetrics = new HashSet<>(); - BufferedReader br = null; String strLine; - try(FileInputStream fstream = new FileInputStream(whitelistFile)) { - br = new BufferedReader(new InputStreamReader(fstream)); - + try(BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(whitelistFile)))) { while ((strLine = br.readLine()) != null) { strLine = strLine.trim(); if (StringUtils.isEmpty(strLine)) { continue; } - if (strLine.startsWith(patternPrefix)) { - strLine = strLine.replace(patternPrefix, ""); + if (strLine.startsWith(PATTERN_PREFIX)) { + strLine = strLine.replace(PATTERN_PREFIX, ""); } if (strLine.contains("*")) { strLine = strLine.replaceAll("\\*", "%"); @@ -248,20 +264,21 @@ public class MetricsDataMigrationLauncher { private void saveMetadata() throws SQLException { LOG.info("Saving metadata to store..."); - timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables(); - timelineMetricMetadataManager.forceMetricsMetadataSync(); + this.timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables(); + this.timelineMetricMetadataManager.forceMetricsMetadataSync(); LOG.info("Metadata was saved."); } - /** * * @param args * REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored * * OPTIONAL args[1] - whitelistedFilePath - full path to the file with whitelisted metrics filenames - * if not provided the default whitelist file location will be used if configured - * if not configured - will result in error + * if not provided and AMS whitelisting is enabled the default whitelist + * file location will be used if configured + * if not provided and AMS whitelisting is disabled then no whitelisting + * will be used and all the metrics will be migrated * args[2] - startTime - default value is set to the last 30 days * args[3] - numberOfThreads - default value is 3 * args[4] - batchSize - default value is 5 @@ -303,24 +320,23 @@ public class MetricsDataMigrationLauncher { System.exit(1); } + int exitCode = 0; try { - //Setup shutdown hook for metadata save. - MetricsDataMigrationLauncher finalDataMigrationLauncher = dataMigrationLauncher; - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - finalDataMigrationLauncher.saveMetadata(); - } catch (SQLException e) { - LOG.error("Exception during metadata saving, exiting...", e); - } - })); - dataMigrationLauncher.runMigration(timeoutInMinutes); - } catch (IOException e) { + } catch (Throwable e) { + exitCode = 1; LOG.error("Exception during data migration, exiting...", e); - System.exit(1); + } finally { + try { + dataMigrationLauncher.saveMetadata(); + } catch (SQLException e) { + exitCode = 1; + LOG.error("Exception while saving the Metadata, exiting...", e); + } } - System.exit(0); + if(exitCode == 0) LOG.info("Data migration finished successfully."); + System.exit(exitCode); } } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java index ee65f00..037b3d2 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java @@ -32,7 +32,7 @@ import java.util.Set; public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class); - private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); @@ -53,7 +53,7 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { @Override protected void saveMetrics() throws SQLException { LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable)); - hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable); + this.hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable); } @Override @@ -62,13 +62,8 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { rs.getString("METRIC_NAME"), rs.getString("APP_ID"), rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME")); - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); - - aggregateMap.put(timelineMetric, metricHostAggregate); + MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs); + this.aggregateMap.put(timelineMetric, metricHostAggregate); } } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java index a4f0c23..11c1df9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java @@ -32,7 +32,7 @@ import java.util.Set; public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class); - private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); @@ -54,7 +54,7 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { @Override protected void saveMetrics() throws SQLException { LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable)); - hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable); + this.hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable); } @Override @@ -66,12 +66,8 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { timelineMetric.setInstanceId(rs.getString("INSTANCE_ID")); timelineMetric.setStartTime(rs.getLong("SERVER_TIME")); - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs); - aggregateMap.put(timelineMetric, metricHostAggregate); + this.aggregateMap.put(timelineMetric, metricHostAggregate); } }