This is an automated email from the ASF dual-hosted git repository.
payert pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new a4990d9 AMBARI-25569 Reassess Ambari Metrics data migration - 2nd
part (#3254)
a4990d9 is described below
commit a4990d9431643b82d1ec35ae1cd0c0739b5f7035
Author: Tamas Payer <[email protected]>
AuthorDate: Fri Dec 4 11:21:43 2020 +0100
AMBARI-25569 Reassess Ambari Metrics data migration - 2nd part (#3254)
* Introduce --allmetrics to enforce migration of all metrics regardless
other settings.
Due to the suboptimal argumenet handling if one wants to define define
argument that is behind the 'whitelist file'
argument - like the 'starttime' - the whitelist file cannot be left empty,
so the --allmetrics argument can be provided instead.
Change-Id: I1df6eb7ecdddb412c08c7cc48781da2679d2b75d
* Improve performance by reducing synchronization granularity and
leveraging BufferedWriter
Change-Id: I26dced203a49e69e428ec7802fc3fb7dd7a68baf
* Fix erroneous startTime handling
When external starttime was specified it was not subtracted from the actual
time.
Change-Id: I52815fc04548ed9cf11505f311f0f893db9bf352
* Adding log message about the migration time frame.
Change-Id: Ibdfd747239ffe2153ec4293b68513a417aee8cb2
* Fix review change requests
Change-Id: I714d3f98fbade93ba69e8ef0dd75e630a744710a
Co-authored-by: Tamas Payer <[email protected]>
---
.../upgrade/core/AbstractPhoenixMetricsCopier.java | 48 +++++++----------
.../upgrade/core/MetricsDataMigrationLauncher.java | 62 ++++++++++++++++------
.../upgrade/core/PhoenixClusterMetricsCopier.java | 4 +-
.../upgrade/core/PhoenixHostMetricsCopier.java | 5 +-
4 files changed, 69 insertions(+), 50 deletions(-)
diff --git
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
index d69f28a..8d075fe 100644
---
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
+++
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -22,8 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -32,16 +32,16 @@ import java.util.Set;
public abstract class AbstractPhoenixMetricsCopier implements Runnable {
private static final Log LOG =
LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
- private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
- private final Long startTime;
- protected final FileWriter processedMetricsFile;
+ private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+ private final long startTime;
+ protected final Writer processedMetricsFile;
protected String inputTable;
protected String outputTable;
protected Set<String> metricNames;
protected PhoenixHBaseAccessor hBaseAccessor;
public AbstractPhoenixMetricsCopier(String inputTableName, String
outputTableName, PhoenixHBaseAccessor hBaseAccessor,
- Set<String> metricNames, Long startTime,
FileWriter outputStream) {
+ Set<String> metricNames, long startTime,
Writer outputStream) {
this.inputTable = inputTableName;
this.outputTable = outputTableName;
this.hBaseAccessor = hBaseAccessor;
@@ -53,8 +53,8 @@ public abstract class AbstractPhoenixMetricsCopier implements
Runnable {
@Override
public void run(){
LOG.info(String.format("Copying %s metrics from %s to %s", metricNames,
inputTable, outputTable));
- final long startTimer = System.currentTimeMillis();
- final String query = String.format("SELECT %s %s FROM %s WHERE %s AND
SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
+ long timerStart = System.currentTimeMillis();
+ String query = String.format("SELECT %s %s FROM %s WHERE %s AND
SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
getQueryHint(startTime), getColumnsClause(), inputTable,
getMetricNamesLikeClause(), startTime);
runPhoenixQueryAndAddToResults(query);
@@ -64,24 +64,19 @@ public abstract class AbstractPhoenixMetricsCopier
implements Runnable {
} catch (SQLException e) {
LOG.error(e);
} finally {
- final long estimatedTime = System.currentTimeMillis() - startTimer;
- LOG.debug(String.format("Copying took %s seconds from table %s to table
%s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable,
metricNames));
+ long timerDelta = System.currentTimeMillis() - timerStart;
+ LOG.debug(String.format("Copying took %s seconds from table %s to table
%s for metric names %s", timerDelta/ 1000.0, inputTable, outputTable,
metricNames));
saveMetricsProgress();
}
}
private String getMetricNamesLikeClause() {
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder(256);
sb.append('(');
int i = 0;
for (String metricName : metricNames) {
- sb.append("METRIC_NAME");
- sb.append(" LIKE ");
- sb.append("'");
- sb.append(metricName);
- sb.append("'");
-
+ sb.append("METRIC_NAME LIKE '").append(metricName).append("'");
if (i < metricNames.size() - 1) {
sb.append(" OR ");
}
@@ -116,25 +111,20 @@ public abstract class AbstractPhoenixMetricsCopier
implements Runnable {
LOG.info("Skipping metrics progress save as the file is null");
return;
}
- synchronized (this.processedMetricsFile) {
- for (String metricName : metricNames) {
- try {
+
+ for (String metricName : metricNames) {
+ try {
+ synchronized (this.processedMetricsFile) {
this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator());
- } catch (IOException e) {
- LOG.error(e);
}
+ } catch (IOException e) {
+ LOG.error(e);
}
}
}
- protected String getQueryHint(Long startTime) {
- final StringBuilder sb = new StringBuilder();
- sb.append("/*+ ");
- sb.append("NATIVE_TIME_RANGE(");
- sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
- sb.append(") ");
- sb.append("*/");
- return sb.toString();
+ protected String getQueryHint(long startTime) {
+ return new StringBuilder().append("/*+
NATIVE_TIME_RANGE(").append(startTime -
DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString();
}
protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs)
throws SQLException {
diff --git
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
index 889158f..0c2f8e6 100644
---
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
+++
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -28,17 +28,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,12 +73,12 @@ public class MetricsDataMigrationLauncher {
private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
private static final String PATTERN_PREFIX = "._p_";
private static final int DEFAULT_BATCH_SIZE = 5;
+ private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics";
public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING =
new HashMap<>();
public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new
HashMap<>();
public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION =
"/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
public static final int DEFAULT_NUMBER_OF_THREADS = 3;
- public static final long ONE_MONTH_MILLIS = 2592000000L;
- public static final long DEFAULT_START_TIME = System.currentTimeMillis() -
ONE_MONTH_MILLIS; //Last month
+ public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month
static {
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME,
METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
@@ -88,15 +93,15 @@ public class MetricsDataMigrationLauncher {
private final Set<Set<String>> metricNamesBatches;
private final String processedMetricsFilePath;
- private final Long startTime;
- private final Integer numberOfThreads;
+ private final long startTimeEpoch;
+ private final int numberOfThreads;
private TimelineMetricConfiguration timelineMetricConfiguration;
private PhoenixHBaseAccessor hBaseAccessor;
private TimelineMetricMetadataManager timelineMetricMetadataManager;
private Map<String, Set<String>> processedMetrics;
- public MetricsDataMigrationLauncher(String whitelistedFilePath, String
processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer
batchSize) throws Exception {
- this.startTime = (startTime == null) ? DEFAULT_START_TIME : startTime;
+ public MetricsDataMigrationLauncher(String whitelistedFilePath, String
processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer
batchSize) throws Exception {
+ this.startTimeEpoch = calculateStartEpochTime(startDay);
this.numberOfThreads = (numberOfThreads == null) ?
DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
this.processedMetricsFilePath = (processedMetricsFilePath == null) ?
DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
@@ -114,8 +119,29 @@ public class MetricsDataMigrationLauncher {
LOG.info(String.format("Split metric names into %s batches with size of
%s", metricNamesBatches.size(), batchSize));
}
+ private long calculateStartEpochTime(Long startDay) {
+ final long days;
+ if (startDay == null) {
+ LOG.info(String.format("No starting day have been provided, using
default: %d", DEFAULT_START_DAYS));
+ days = DEFAULT_START_DAYS;
+ } else {
+ LOG.info(String.format("%d days have been provided as migration starting
day.", startDay));
+ days = startDay;
+ }
+ LOG.info(String.format("The last %d days' data will be migrated.", days));
+
+ return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC);
+ }
+
private Set<String> getMetricNames(String whitelistedFilePath) throws
MalformedURLException, URISyntaxException, SQLException {
- if(whitelistedFilePath != null) {
+ if(StringUtils.isNotEmpty(whitelistedFilePath) &&
whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) {
+ LOG.info("Migration of all metrics has been requested by the " +
MIGRATE_ALL_METRICS_ARG + " argument.");
+ LOG.info("Looking for all the metric names in the Metrics Database...");
+ return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+
.map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+ }
+
+ if(StringUtils.isNotEmpty(whitelistedFilePath)) {
LOG.info(String.format("Whitelist file %s has been provided.",
whitelistedFilePath));
LOG.info("Looking for whitelisted metric names based on the file
content...");
return readMetricWhitelistFromFile(whitelistedFilePath);
@@ -164,7 +190,7 @@ public class MetricsDataMigrationLauncher {
}
public void runMigration(Long timeoutInMinutes) throws IOException {
- try (FileWriter processedMetricsFileWriter = new
FileWriter(this.processedMetricsFilePath, true)) {
+ try (Writer processedMetricsFileWriter = new BufferedWriter(new
FileWriter(this.processedMetricsFilePath, true))) {
LOG.info("Setting up copiers...");
Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
for (Set<String> batch : metricNamesBatches) {
@@ -172,7 +198,7 @@ public class MetricsDataMigrationLauncher {
Set<String> filteredMetrics = filterProcessedMetrics(batch,
this.processedMetrics, entry.getKey());
if (!filteredMetrics.isEmpty()) {
copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(),
entry.getValue(), this.hBaseAccessor,
- filteredMetrics, this.startTime, processedMetricsFileWriter));
+ filteredMetrics, this.startTimeEpoch,
processedMetricsFileWriter));
}
}
@@ -180,7 +206,7 @@ public class MetricsDataMigrationLauncher {
Set<String> filteredMetrics = filterProcessedMetrics(batch,
this.processedMetrics, entry.getKey());
if (!filteredMetrics.isEmpty()) {
copiers.add(new PhoenixHostMetricsCopier(entry.getKey(),
entry.getValue(), this.hBaseAccessor,
- filteredMetrics, this.startTime, processedMetricsFileWriter));
+ filteredMetrics, this.startTimeEpoch,
processedMetricsFileWriter));
}
}
}
@@ -191,7 +217,7 @@ public class MetricsDataMigrationLauncher {
}
LOG.info("Running the copy threads...");
- long startTimer = System.currentTimeMillis();
+ long timerStart = System.currentTimeMillis();
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(this.numberOfThreads);
@@ -209,8 +235,8 @@ public class MetricsDataMigrationLauncher {
}
}
- long estimatedTime = System.currentTimeMillis() - startTimer;
- LOG.info(String.format("Copying took %s seconds", estimatedTime /
1000.0));
+ long timerDelta = System.currentTimeMillis() - timerStart;
+ LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0));
}
}
@@ -279,7 +305,9 @@ public class MetricsDataMigrationLauncher {
* file location will be used
if configured
* if not provided and AMS
whitelisting is disabled then no whitelisting
* will be used and all the
metrics will be migrated
- * args[2] - startTime - default value is set to the
last 30 days
+ * if --allmetrics switch is
provided then all the metrics will be migrated
+ * regardless to other settings
+ * args[2] - startDay - default value is set to the
last 30 days
* args[3] - numberOfThreads - default value is 3
* args[4] - batchSize - default value is 5
* args[5] - timeoutInMinutes - default value is set to the
equivalent of 24 hours
@@ -287,7 +315,7 @@ public class MetricsDataMigrationLauncher {
public static void main(String[] args) {
String processedMetricsFilePath = null;
String whitelistedFilePath = null;
- Long startTime = null;
+ Long startDay = null;
Integer numberOfThreads = null;
Integer batchSize = null;
Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
@@ -299,7 +327,7 @@ public class MetricsDataMigrationLauncher {
whitelistedFilePath = args[1];
}
if (args.length>2) {
- startTime = Long.valueOf(args[2]);
+ startDay = Long.valueOf(args[2]);
}
if (args.length>3) {
numberOfThreads = Integer.valueOf(args[3]);
@@ -314,7 +342,7 @@ public class MetricsDataMigrationLauncher {
MetricsDataMigrationLauncher dataMigrationLauncher = null;
try {
LOG.info("Initializing system...");
- dataMigrationLauncher = new
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath,
startTime, numberOfThreads, batchSize);
+ dataMigrationLauncher = new
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath,
startDay, numberOfThreads, batchSize);
} catch (Exception e) {
LOG.error("Exception during system setup, exiting...", e);
System.exit(1);
diff --git
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
index 037b3d2..177d202 100644
---
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
+++
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import java.io.FileWriter;
+import java.io.Writer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -34,7 +34,7 @@ public class PhoenixClusterMetricsCopier extends
AbstractPhoenixMetricsCopier {
private static final Log LOG =
LogFactory.getLog(PhoenixClusterMetricsCopier.class);
private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap =
new HashMap<>();
- PhoenixClusterMetricsCopier(String inputTableName, String outputTableName,
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime,
FileWriter processedMetricsFileWriter) {
+ PhoenixClusterMetricsCopier(String inputTableName, String outputTableName,
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime,
Writer processedMetricsFileWriter) {
super(inputTableName, outputTableName, hBaseAccessor, metricNames,
startTime, processedMetricsFileWriter);
}
diff --git
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
index 11c1df9..5964a3a 100644
---
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
+++
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.FileWriter;
+
+import java.io.Writer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -34,7 +35,7 @@ public class PhoenixHostMetricsCopier extends
AbstractPhoenixMetricsCopier {
private static final Log LOG =
LogFactory.getLog(PhoenixHostMetricsCopier.class);
private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new
HashMap<>();
- PhoenixHostMetricsCopier(String inputTableName, String outputTableName,
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime,
FileWriter processedMetricsFileWriter) {
+ PhoenixHostMetricsCopier(String inputTableName, String outputTableName,
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime,
Writer processedMetricsFileWriter) {
super(inputTableName, outputTableName, hBaseAccessor, metricNames,
startTime, processedMetricsFileWriter);
}