This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch es/namespace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/es/namespace by this push:
new 0a41143 Keep ESDao table name in control.
0a41143 is described below
commit 0a41143220e8c86ffebc64d82502389246cd12f5
Author: wusheng <[email protected]>
AuthorDate: Wed Mar 21 16:02:03 2018 +0800
Keep ESDao table name in control.
---
.../client/elasticsearch/ElasticSearchClient.java | 95 ++++++++++++++++++----
.../collector/storage/table/NamespaceHandler.java | 27 ------
.../apm/collector/storage/table/Table.java | 37 ---------
.../collector/storage/table/TableNamespace.java | 39 ---------
.../storage/es/StorageModuleEsProvider.java | 10 +--
.../es/base/dao/AbstractPersistenceEsDAO.java | 21 +++--
.../es/dao/GlobalTraceEsPersistenceDAO.java | 21 +++--
.../es/dao/SegmentDurationEsPersistenceDAO.java | 21 +++--
.../storage/es/dao/SegmentEsPersistenceDAO.java | 21 +++--
.../alarm/ApplicationAlarmEsPersistenceDAO.java | 31 ++++---
.../ApplicationReferenceAlarmEsPersistenceDAO.java | 33 ++++----
...licationReferenceAlarmListEsPersistenceDAO.java | 33 ++++----
.../dao/alarm/InstanceAlarmEsPersistenceDAO.java | 33 ++++----
.../alarm/InstanceAlarmListEsPersistenceDAO.java | 33 ++++----
.../InstanceReferenceAlarmEsPersistenceDAO.java | 37 +++++----
...InstanceReferenceAlarmListEsPersistenceDAO.java | 37 +++++----
.../es/dao/alarm/ServiceAlarmEsPersistenceDAO.java | 35 ++++----
.../alarm/ServiceAlarmListEsPersistenceDAO.java | 35 ++++----
.../ServiceReferenceAlarmEsPersistenceDAO.java | 45 +++++-----
.../ServiceReferenceAlarmListEsPersistenceDAO.java | 45 +++++-----
.../storage/es/dao/ui/CpuMetricEsUIDAO.java | 25 +++---
.../storage/es/dao/ui/GCMetricEsUIDAO.java | 23 ++++--
.../storage/es/dao/ui/InstanceMetricEsUIDAO.java | 56 +++++++------
.../storage/es/dao/ui/MemoryMetricEsUIDAO.java | 33 ++++----
.../storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java | 28 +------
.../storage/es/dao/ui/ServiceMetricEsUIDAO.java | 80 +++++++++---------
26 files changed, 481 insertions(+), 453 deletions(-)
diff --git
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index f639f81..6d2f866 100644
---
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -19,12 +19,9 @@
package org.apache.skywalking.apm.collector.client.elasticsearch;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
+import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
@@ -36,16 +33,25 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.apache.skywalking.apm.collector.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
/**
* @author peng-yongsheng
*/
@@ -55,37 +61,42 @@ public class ElasticSearchClient implements Client {
private org.elasticsearch.client.Client client;
+ private final String namespace;
+
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final String clusterNodes;
- public ElasticSearchClient(String clusterName, Boolean
clusterTransportSniffer, String clusterNodes) {
+ public ElasticSearchClient(String namespace, String clusterName, Boolean
clusterTransportSniffer, String clusterNodes) {
+ this.namespace = namespace;
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
}
- @Override public void initialize() throws ClientException {
+ @Override
+ public void initialize() throws ClientException {
Settings settings = Settings.builder()
- .put("cluster.name", clusterName)
- .put("client.transport.sniff", clusterTransportSniffer)
- .build();
+ .put("cluster.name", clusterName)
+ .put("client.transport.sniff", clusterTransportSniffer)
+ .build();
client = new PreBuiltTransportClient(settings);
List<AddressPairs> pairsList = parseClusterNodes(clusterNodes);
for (AddressPairs pairs : pairsList) {
try {
- ((PreBuiltTransportClient)client).addTransportAddress(new
InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
+ ((PreBuiltTransportClient) client).addTransportAddress(new
InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
} catch (UnknownHostException e) {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
}
- @Override public void shutdown() {
+ @Override
+ public void shutdown() {
}
@@ -114,12 +125,14 @@ public class ElasticSearchClient implements Client {
public boolean createIndex(String indexName, String indexType, Settings
settings, XContentBuilder mappingBuilder) {
IndicesAdminClient adminClient = client.admin().indices();
+ indexName = formatIndexName(indexName);
CreateIndexResponse response =
adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType,
mappingBuilder).get();
logger.info("create {} index with type of {} finished, isAcknowledged:
{}", indexName, indexType, response.isAcknowledged());
return response.isShardsAcked();
}
public boolean deleteIndex(String indexName) {
+ indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response =
adminClient.prepareDelete(indexName).get();
logger.info("delete {} index finished, isAcknowledged: {}", indexName,
response.isAcknowledged());
@@ -127,35 +140,72 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
+ indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response =
adminClient.prepareExists(indexName).get();
return response.isExists();
}
public SearchRequestBuilder prepareSearch(String indexName) {
+ indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
+ indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
}
- public DeleteByQueryRequestBuilder prepareDelete() {
- return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
+ public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder
queryBuilder, String indexName) {
+ indexName = formatIndexName(indexName);
+ return
DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
+ }
+
+ public MultiGetRequestBuilder prepareMultiGet(List<?> rows,
MultiGetRowHandler rowHandler) {
+ MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
+ rowHandler.setPrepareMultiGet(prepareMultiGet);
+ rowHandler.setNamespace(namespace);
+
+ rows.forEach(row -> {
+ rowHandler.accept(row);
+ });
+
+ return rowHandler.getPrepareMultiGet();
}
- public MultiGetRequestBuilder prepareMultiGet() {
- return client.prepareMultiGet();
+ public abstract static class MultiGetRowHandler<T> implements Consumer<T> {
+ private MultiGetRequestBuilder prepareMultiGet;
+ private String namespace;
+
+ public void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet)
{
+ this.prepareMultiGet = prepareMultiGet;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public void add(String indexName, @Nullable String type, String id) {
+ indexName = formatIndexName(namespace, indexName);
+ prepareMultiGet = prepareMultiGet.add(indexName, type, id);
+ }
+
+ private MultiGetRequestBuilder getPrepareMultiGet() {
+ return prepareMultiGet;
+ }
}
+
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
@@ -167,4 +217,15 @@ public class ElasticSearchClient implements Client {
logger.error(e.getMessage(), e);
}
}
+
+ private String formatIndexName(String indexName) {
+ return formatIndexName(this.namespace, indexName);
+ }
+
+ private static String formatIndexName(String namespace, String indexName) {
+ if (StringUtils.isNotEmpty(namespace)) {
+ return namespace + "_" + indexName;
+ }
+ return indexName;
+ }
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/NamespaceHandler.java
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/NamespaceHandler.java
deleted file mode 100644
index 77df153..0000000
---
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/NamespaceHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.collector.storage.table;
-
-/**
- * <code>NamespaceHandler</code> represents the implementation which controls
a namespace
- * for storage module.
- */
-public interface NamespaceHandler {
- String getNamespace();
-}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/Table.java
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/Table.java
deleted file mode 100644
index 233d9f1..0000000
---
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/Table.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.collector.storage.table;
-
-import org.apache.skywalking.apm.collector.core.data.CommonTable;
-
-/**
- * <code>Table</code> used as the basic type of all tables/types for storage
implementations,
- * which support namespace
- */
-public abstract class Table extends CommonTable {
-
-
- public String getTableName() {
- return TableNamespace.INSTANCE.namespace() + tableName();
- }
-
- protected abstract String tableName();
-
-
-}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/TableNamespace.java
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/TableNamespace.java
deleted file mode 100644
index 0868468..0000000
---
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/TableNamespace.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.collector.storage.table;
-
-public enum TableNamespace {
- INSTANCE;
-
- private NamespaceHandler handler = new NamespaceHandler() {
-
- @Override
- public String getNamespace() {
- return "";
- }
- };
-
- public void setHandler(NamespaceHandler namespaceHandler) {
- this.handler = namespaceHandler;
- }
-
- public String namespace() {
- return handler.getNamespace();
- }
-}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index 8b13744..ff52285 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -141,8 +141,6 @@ import
org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceH
import
org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMinuteMetricEsPersistenceDAO;
import
org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMonthMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ui.*;
-import org.apache.skywalking.apm.collector.storage.table.TableNamespace;
-import org.apache.skywalking.apm.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -182,7 +180,8 @@ public class StorageModuleEsProvider extends ModuleProvider
{
String clusterName = config.getProperty(CLUSTER_NAME);
Boolean clusterTransportSniffer = (Boolean)
config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
- elasticSearchClient = new ElasticSearchClient(clusterName,
clusterTransportSniffer, clusterNodes);
+ String namespace =
getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
+ elasticSearchClient = new ElasticSearchClient(namespace, clusterName,
clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(IBatchDAO.class, new
BatchEsDAO(elasticSearchClient));
registerCacheDAO();
@@ -194,11 +193,6 @@ public class StorageModuleEsProvider extends
ModuleProvider {
@Override
public void start(Properties config) throws ServiceNotProvidedException {
- String namespace =
getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
- if (!StringUtil.isEmpty(namespace)) {
- TableNamespace.INSTANCE.setHandler(() -> namespace + "_");
- }
-
Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
Integer indexReplicasNumber = (Integer)
config.get(INDEX_REPLICAS_NUMBER);
try {
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index e98d2b5..0c44eca 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.collector.storage.es.base.dao;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -46,7 +47,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA
extends StreamData> e
protected abstract String tableName();
- @Override public final STREAM_DATA get(String id) {
+ @Override
+ public final STREAM_DATA get(String id) {
GetResponse getResponse = getClient().prepareGet(tableName(),
id).get();
if (getResponse.isExists()) {
STREAM_DATA streamData =
esDataToStreamData(getResponse.getSource());
@@ -59,25 +61,28 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA
extends StreamData> e
protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA
streamData);
- @Override public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA
streamData) {
+ @Override
+ public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA
streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareIndex(tableName(),
streamData.getId()).setSource(source);
}
- @Override public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA
streamData) {
+ @Override
+ public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA
streamData) {
Map<String, Object> source = esStreamDataToEsData(streamData);
return getClient().prepareUpdate(tableName(),
streamData.getId()).setDoc(source);
}
protected abstract String timeBucketColumnNameForDelete();
- @Override public final void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket))
- .source(tableName())
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket)
+ , tableName())
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
tableName());
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
index 2b9f23a..b134636 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -45,15 +46,18 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO
implements IGlobalTracePe
super(client);
}
- @Override public GlobalTrace get(String id) {
+ @Override
+ public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data
with database data.");
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data)
{
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
throw new UnexpectedException("There is no need to merge stream data
with database data.");
}
- @Override public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getSegmentId());
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID,
data.getGlobalTraceId());
@@ -62,13 +66,14 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO
implements IGlobalTracePe
return getClient().prepareIndex(GlobalTraceTable.TABLE,
data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(GlobalTraceTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ GlobalTraceTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
GlobalTraceTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 611666e..363bbc0 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
@@ -44,15 +45,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO
implements ISegmentDu
super(client);
}
- @Override public SegmentDuration get(String id) {
+ @Override
+ public SegmentDuration get(String id) {
return null;
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
return null;
}
- @Override public IndexRequestBuilder prepareBatchInsert(SegmentDuration
data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
logger.debug("segment cost prepareBatchInsert, getApplicationId: {}",
data.getId());
Map<String, Object> source = new HashMap<>();
source.put(SegmentDurationTable.COLUMN_SEGMENT_ID,
data.getSegmentId());
@@ -67,13 +71,14 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO
implements ISegmentDu
return getClient().prepareIndex(SegmentDurationTable.TABLE,
data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(SegmentDurationTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ SegmentDurationTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
SegmentDurationTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
index 61bb6ad..31cda7e 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
@@ -45,15 +46,18 @@ public class SegmentEsPersistenceDAO extends EsDAO
implements ISegmentPersistenc
super(client);
}
- @Override public Segment get(String id) {
+ @Override
+ public Segment get(String id) {
return null;
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
return null;
}
- @Override public IndexRequestBuilder prepareBatchInsert(Segment data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(Segment data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new
String(Base64.getEncoder().encode(data.getDataBinary())));
source.put(SegmentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
@@ -61,13 +65,14 @@ public class SegmentEsPersistenceDAO extends EsDAO
implements ISegmentPersistenc
return getClient().prepareIndex(SegmentTable.TABLE,
data.getId()).setSource(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(SegmentTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ SegmentTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
SegmentTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index 4e3b058..b1a4bda 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO
implements IApplicat
super(client);
}
- @Override public ApplicationAlarm get(String id) {
+ @Override
+ public ApplicationAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(ApplicationAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationAlarm instanceAlarm = new ApplicationAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
instanceAlarm.setApplicationId(((Number)source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-
instanceAlarm.setSourceValue(((Number)source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarm.setApplicationId(((Number)
source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarm.setSourceValue(((Number)
source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
instanceAlarm.setAlarmType(((Number)source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
instanceAlarm.setAlarmContent((String)source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceAlarm.setAlarmType(((Number)
source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarm.setAlarmContent((String)
source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
-
instanceAlarm.setLastTimeBucket(((Number)source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceAlarm.setLastTimeBucket(((Number)
source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm
data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE,
data.getSourceValue());
@@ -78,7 +81,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO
implements IApplicat
return getClient().prepareIndex(ApplicationAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE,
data.getSourceValue());
@@ -91,13 +95,14 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO
implements IApplicat
return getClient().prepareUpdate(ApplicationAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket)
+ , ApplicationAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ApplicationAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
index 30cb14b..33c1111 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmEsPersistenceDAO
extends EsDAO implements
super(client);
}
- @Override public ApplicationReferenceAlarm get(String id) {
+ @Override
+ public ApplicationReferenceAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(ApplicationReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarm applicationReferenceAlarm = new
ApplicationReferenceAlarm();
applicationReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
applicationReferenceAlarm.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
applicationReferenceAlarm.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
applicationReferenceAlarm.setSourceValue(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ applicationReferenceAlarm.setFrontApplicationId(((Number)
source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ applicationReferenceAlarm.setBehindApplicationId(((Number)
source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ applicationReferenceAlarm.setSourceValue(((Number)
source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
applicationReferenceAlarm.setAlarmType(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
applicationReferenceAlarm.setAlarmContent((String)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+ applicationReferenceAlarm.setAlarmType(((Number)
source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ applicationReferenceAlarm.setAlarmContent((String)
source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
-
applicationReferenceAlarm.setLastTimeBucket(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ applicationReferenceAlarm.setLastTimeBucket(((Number)
source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return applicationReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(ApplicationReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm
data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmEsPersistenceDAO
extends EsDAO implements
return getClient().prepareIndex(ApplicationReferenceAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(ApplicationReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm
data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmEsPersistenceDAO
extends EsDAO implements
return getClient().prepareUpdate(ApplicationReferenceAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ApplicationReferenceAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
index 4b7469c..062c794 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO
extends EsDAO impleme
super(client);
}
- @Override public ApplicationReferenceAlarmList get(String id) {
+ @Override
+ public ApplicationReferenceAlarmList get(String id) {
GetResponse getResponse =
getClient().prepareGet(ApplicationReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ApplicationReferenceAlarmList applicationReferenceAlarmList = new
ApplicationReferenceAlarmList();
applicationReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
-
applicationReferenceAlarmList.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
applicationReferenceAlarmList.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
applicationReferenceAlarmList.setSourceValue(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ applicationReferenceAlarmList.setFrontApplicationId(((Number)
source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ applicationReferenceAlarmList.setBehindApplicationId(((Number)
source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ applicationReferenceAlarmList.setSourceValue(((Number)
source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
applicationReferenceAlarmList.setAlarmType(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-
applicationReferenceAlarmList.setAlarmContent((String)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+ applicationReferenceAlarmList.setAlarmType(((Number)
source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ applicationReferenceAlarmList.setAlarmContent((String)
source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
-
applicationReferenceAlarmList.setTimeBucket(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ applicationReferenceAlarmList.setTimeBucket(((Number)
source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return applicationReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(ApplicationReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder
prepareBatchInsert(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO
extends EsDAO impleme
return
getClient().prepareIndex(ApplicationReferenceAlarmListTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(ApplicationReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder
prepareBatchUpdate(ApplicationReferenceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO
extends EsDAO impleme
return
getClient().prepareUpdate(ApplicationReferenceAlarmListTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ApplicationReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ApplicationReferenceAlarmListTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
index 64631c6..cec07f4 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO
implements IInstanceAla
super(client);
}
- @Override public InstanceAlarm get(String id) {
+ @Override
+ public InstanceAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(InstanceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarm instanceAlarm = new InstanceAlarm();
instanceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
instanceAlarm.setApplicationId(((Number)source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-
instanceAlarm.setInstanceId(((Number)source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
-
instanceAlarm.setSourceValue(((Number)source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarm.setApplicationId(((Number)
source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarm.setInstanceId(((Number)
source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+ instanceAlarm.setSourceValue(((Number)
source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
instanceAlarm.setAlarmType(((Number)source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
instanceAlarm.setAlarmContent((String)source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceAlarm.setAlarmType(((Number)
source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarm.setAlarmContent((String)
source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
-
instanceAlarm.setLastTimeBucket(((Number)source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceAlarm.setLastTimeBucket(((Number)
source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarm
data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO
implements IInstanceAla
return getClient().prepareIndex(InstanceAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO
implements IInstanceAla
return getClient().prepareUpdate(InstanceAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
InstanceAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
index 47cefae..ac350bd 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmListEsPersistenceDAO extends
EsDAO implements IInstanc
super(client);
}
- @Override public InstanceAlarmList get(String id) {
+ @Override
+ public InstanceAlarmList get(String id) {
GetResponse getResponse =
getClient().prepareGet(InstanceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceAlarmList instanceAlarmList = new InstanceAlarmList();
instanceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
-
instanceAlarmList.setApplicationId(((Number)source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
-
instanceAlarmList.setInstanceId(((Number)source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
-
instanceAlarmList.setSourceValue(((Number)source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceAlarmList.setApplicationId(((Number)
source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+ instanceAlarmList.setInstanceId(((Number)
source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+ instanceAlarmList.setSourceValue(((Number)
source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
instanceAlarmList.setAlarmType(((Number)source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-
instanceAlarmList.setAlarmContent((String)source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
+ instanceAlarmList.setAlarmType(((Number)
source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceAlarmList.setAlarmContent((String)
source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
-
instanceAlarmList.setTimeBucket(((Number)source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ instanceAlarmList.setTimeBucket(((Number)
source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return instanceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList
data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO
implements IInstanc
return getClient().prepareIndex(InstanceAlarmListTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmListEsPersistenceDAO extends
EsDAO implements IInstanc
return getClient().prepareUpdate(InstanceAlarmListTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
InstanceAlarmListTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
index fd12f43..79d21da 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends
EsDAO implements IIn
super(client);
}
- @Override public InstanceReferenceAlarm get(String id) {
+ @Override
+ public InstanceReferenceAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(InstanceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarm instanceReferenceAlarm = new
InstanceReferenceAlarm();
instanceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
instanceReferenceAlarm.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
instanceReferenceAlarm.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
instanceReferenceAlarm.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-
instanceReferenceAlarm.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-
instanceReferenceAlarm.setSourceValue(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ instanceReferenceAlarm.setFrontApplicationId(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ instanceReferenceAlarm.setBehindApplicationId(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ instanceReferenceAlarm.setFrontInstanceId(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ instanceReferenceAlarm.setBehindInstanceId(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ instanceReferenceAlarm.setSourceValue(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
instanceReferenceAlarm.setAlarmType(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
instanceReferenceAlarm.setAlarmContent((String)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+ instanceReferenceAlarm.setAlarmType(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ instanceReferenceAlarm.setAlarmContent((String)
source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
-
instanceReferenceAlarm.setLastTimeBucket(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ instanceReferenceAlarm.setLastTimeBucket(((Number)
source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return instanceReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(InstanceReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data)
{
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends
EsDAO implements IIn
return getClient().prepareIndex(InstanceReferenceAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(InstanceReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm
data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmEsPersistenceDAO
extends EsDAO implements IIn
return getClient().prepareUpdate(InstanceReferenceAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
InstanceReferenceAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
index 7738849..2c93cbb 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
super(client);
}
- @Override public InstanceReferenceAlarmList get(String id) {
+ @Override
+ public InstanceReferenceAlarmList get(String id) {
GetResponse getResponse =
getClient().prepareGet(InstanceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceReferenceAlarmList serviceReferenceAlarmList = new
InstanceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
-
serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-
serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-
serviceReferenceAlarmList.setSourceValue(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceReferenceAlarmList.setFrontApplicationId(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setBehindApplicationId(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setFrontInstanceId(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindInstanceId(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setSourceValue(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
serviceReferenceAlarmList.setAlarmType(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-
serviceReferenceAlarmList.setAlarmContent((String)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+ serviceReferenceAlarmList.setAlarmType(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarmList.setAlarmContent((String)
source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
-
serviceReferenceAlarmList.setTimeBucket(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceReferenceAlarmList.setTimeBucket(((Number)
source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(InstanceReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList
data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
return getClient().prepareIndex(InstanceReferenceAlarmListTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(InstanceReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList
data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
return
getClient().prepareUpdate(InstanceReferenceAlarmListTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(InstanceReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ InstanceReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
InstanceReferenceAlarmListTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
index 44d8265..a48a79d 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO
implements IServiceAlarm
super(client);
}
- @Override public ServiceAlarm get(String id) {
+ @Override
+ public ServiceAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(ServiceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarm serviceAlarm = new ServiceAlarm();
serviceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
serviceAlarm.setApplicationId(((Number)source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-
serviceAlarm.setInstanceId(((Number)source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
-
serviceAlarm.setServiceId(((Number)source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
-
serviceAlarm.setSourceValue(((Number)source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceAlarm.setApplicationId(((Number)
source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+ serviceAlarm.setInstanceId(((Number)
source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+ serviceAlarm.setServiceId(((Number)
source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
+ serviceAlarm.setSourceValue(((Number)
source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
serviceAlarm.setAlarmType(((Number)source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
serviceAlarm.setAlarmContent((String)source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
+ serviceAlarm.setAlarmType(((Number)
source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceAlarm.setAlarmContent((String)
source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
-
serviceAlarm.setLastTimeBucket(((Number)source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ serviceAlarm.setLastTimeBucket(((Number)
source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data)
{
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO
implements IServiceAlarm
return getClient().prepareIndex(ServiceAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO
implements IServiceAlarm
return getClient().prepareUpdate(ServiceAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ServiceAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
index 56bf3c8..6cb8175 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO
implements IServiceA
super(client);
}
- @Override public ServiceAlarmList get(String id) {
+ @Override
+ public ServiceAlarmList get(String id) {
GetResponse getResponse =
getClient().prepareGet(ServiceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceAlarmList serviceAlarmList = new ServiceAlarmList();
serviceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
-
serviceAlarmList.setApplicationId(((Number)source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
-
serviceAlarmList.setInstanceId(((Number)source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
-
serviceAlarmList.setServiceId(((Number)source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
-
serviceAlarmList.setSourceValue(((Number)source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+ serviceAlarmList.setApplicationId(((Number)
source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+ serviceAlarmList.setInstanceId(((Number)
source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+ serviceAlarmList.setServiceId(((Number)
source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
+ serviceAlarmList.setSourceValue(((Number)
source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
serviceAlarmList.setAlarmType(((Number)source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-
serviceAlarmList.setAlarmContent((String)source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
+ serviceAlarmList.setAlarmType(((Number)
source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceAlarmList.setAlarmContent((String)
source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
-
serviceAlarmList.setTimeBucket(((Number)source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceAlarmList.setTimeBucket(((Number)
source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList
data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO
implements IServiceA
return getClient().prepareIndex(ServiceAlarmListTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList
data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID,
data.getApplicationId());
source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID,
data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmListEsPersistenceDAO extends
EsDAO implements IServiceA
return getClient().prepareUpdate(ServiceAlarmListTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ServiceAlarmListTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
index 6934afc..9400024 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends
EsDAO implements ISer
super(client);
}
- @Override public ServiceReferenceAlarm get(String id) {
+ @Override
+ public ServiceReferenceAlarm get(String id) {
GetResponse getResponse =
getClient().prepareGet(ServiceReferenceAlarmTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarm serviceReferenceAlarm = new
ServiceReferenceAlarm();
serviceReferenceAlarm.setId(id);
Map<String, Object> source = getResponse.getSource();
-
serviceReferenceAlarm.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
serviceReferenceAlarm.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
serviceReferenceAlarm.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-
serviceReferenceAlarm.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-
serviceReferenceAlarm.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
-
serviceReferenceAlarm.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
-
serviceReferenceAlarm.setSourceValue(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
-
serviceReferenceAlarm.setAlarmType(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-
serviceReferenceAlarm.setAlarmContent((String)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
-
-
serviceReferenceAlarm.setLastTimeBucket(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+ serviceReferenceAlarm.setFrontApplicationId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarm.setBehindApplicationId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarm.setFrontInstanceId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarm.setBehindInstanceId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarm.setFrontServiceId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+ serviceReferenceAlarm.setBehindServiceId(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+ serviceReferenceAlarm.setSourceValue(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+
+ serviceReferenceAlarm.setAlarmType(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarm.setAlarmContent((String)
source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+
+ serviceReferenceAlarm.setLastTimeBucket(((Number)
source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
return serviceReferenceAlarm;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(ServiceReferenceAlarm data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends
EsDAO implements ISer
return getClient().prepareIndex(ServiceReferenceAlarmTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(ServiceReferenceAlarm data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data)
{
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmEsPersistenceDAO
extends EsDAO implements ISer
return getClient().prepareUpdate(ServiceReferenceAlarmTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceReferenceAlarmTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceReferenceAlarmTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ServiceReferenceAlarmTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
index b3a8e4e..3014ca1 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.apm.collector.storage.es.dao.alarm;
import java.util.HashMap;
import java.util.Map;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
super(client);
}
- @Override public ServiceReferenceAlarmList get(String id) {
+ @Override
+ public ServiceReferenceAlarmList get(String id) {
GetResponse getResponse =
getClient().prepareGet(ServiceReferenceAlarmListTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceAlarmList serviceReferenceAlarmList = new
ServiceReferenceAlarmList();
serviceReferenceAlarmList.setId(id);
Map<String, Object> source = getResponse.getSource();
-
serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-
serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-
serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-
serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-
serviceReferenceAlarmList.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
-
serviceReferenceAlarmList.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
-
serviceReferenceAlarmList.setSourceValue(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
-
serviceReferenceAlarmList.setAlarmType(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-
serviceReferenceAlarmList.setAlarmContent((String)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
-
-
serviceReferenceAlarmList.setTimeBucket(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+ serviceReferenceAlarmList.setFrontApplicationId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setBehindApplicationId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+ serviceReferenceAlarmList.setFrontInstanceId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindInstanceId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+ serviceReferenceAlarmList.setFrontServiceId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+ serviceReferenceAlarmList.setBehindServiceId(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+ serviceReferenceAlarmList.setSourceValue(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+
+ serviceReferenceAlarmList.setAlarmType(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+ serviceReferenceAlarmList.setAlarmContent((String)
source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+
+ serviceReferenceAlarmList.setTimeBucket(((Number)
source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceAlarmList;
} else {
return null;
}
}
- @Override public IndexRequestBuilder
prepareBatchInsert(ServiceReferenceAlarmList data) {
+ @Override
+ public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList
data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
return getClient().prepareIndex(ServiceReferenceAlarmListTable.TABLE,
data.getId()).setSource(source);
}
- @Override public UpdateRequestBuilder
prepareBatchUpdate(ServiceReferenceAlarmList data) {
+ @Override
+ public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList
data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID,
data.getFrontApplicationId());
source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID,
data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmListEsPersistenceDAO
extends EsDAO implements
return getClient().prepareUpdate(ServiceReferenceAlarmListTable.TABLE,
data.getId()).setDoc(source);
}
- @Override public void deleteHistory(Long startTimestamp, Long
endTimestamp) {
+ @Override
+ public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
- BulkByScrollResponse response = getClient().prepareDelete()
-
.filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
- .source(ServiceReferenceAlarmListTable.TABLE)
- .get();
+ BulkByScrollResponse response = getClient().prepareDelete(
+
QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ServiceReferenceAlarmListTable.TABLE)
+ .get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted,
ServiceReferenceAlarmListTable.TABLE);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
index bd07a48..1d4b1ea 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.LinkedList;
-import java.util.List;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
@@ -32,6 +30,9 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -41,22 +42,26 @@ public class CpuMetricEsUIDAO extends EsDAO implements
ICpuMetricUIDAO {
super(client);
}
- @Override public List<Integer> getCPUTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getCPUTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step,
CpuMetricTable.TABLE);
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
- prepareMultiGet.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
instanceId;
+ this.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+ }
});
+
List<Integer> cpuTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- double cpuUsed =
((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
- long times =
((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
- cpuTrends.add((int)((cpuUsed / times) * 100));
+ double cpuUsed = ((Number)
response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
+ long times = ((Number)
response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
+ cpuTrends.add((int) ((cpuUsed / times) * 100));
} else {
cpuTrends.add(0);
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
index d983bcd..30db5bd 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
+
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
@@ -42,30 +43,34 @@ public class GCMetricEsUIDAO extends EsDAO implements
IGCMetricUIDAO {
super(client);
}
- @Override public List<Integer> getYoungGCTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
+ @Override
+ public List<Integer> getYoungGCTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints,
GCPhrase.NEW_VALUE);
}
- @Override public List<Integer> getOldGCTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
+ @Override
+ public List<Integer> getOldGCTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
return getGCTrend(instanceId, step, durationPoints,
GCPhrase.OLD_VALUE);
}
private List<Integer> getGCTrend(int instanceId, Step step,
List<DurationPoint> durationPoints, int gcPhrase) {
String tableName = TimePyramidTableNameBuilder.build(step,
GCMetricTable.TABLE);
- MultiGetRequestBuilder youngPrepareMultiGet =
getClient().prepareMultiGet();
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId
+ Const.ID_SPLIT + gcPhrase;
- youngPrepareMultiGet.add(tableName, GCMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder youngPrepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
instanceId + Const.ID_SPLIT + gcPhrase;
+ add(tableName, GCMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> gcTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = youngPrepareMultiGet.get();
for (MultiGetItemResponse itemResponse :
multiGetResponse.getResponses()) {
if (itemResponse.getResponse().isExists()) {
- long count =
((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
- long times =
((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
- gcTrends.add((int)(count / times));
+ long count = ((Number)
itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
+ long times = ((Number)
itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
+ gcTrends.add((int) (count / times));
} else {
gcTrends.add(0);
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
index 701550b..3f6ae7f 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.LinkedList;
-import java.util.List;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
@@ -44,6 +42,9 @@ import
org.elasticsearch.search.aggregations.bucket.terms.Terms;
import
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -53,8 +54,9 @@ public class InstanceMetricEsUIDAO extends EsDAO implements
IInstanceMetricUIDAO
super(client);
}
- @Override public List<AppServerInfo> getServerThroughput(int
applicationId, Step step, long startTimeBucket,
- long endTimeBucket, int secondBetween, int topN, MetricSource
metricSource) {
+ @Override
+ public List<AppServerInfo> getServerThroughput(int applicationId, Step
step, long startTimeBucket,
+ long endTimeBucket, int
secondBetween, int topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step,
InstanceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder =
getClient().prepareSearch(tableName);
@@ -82,8 +84,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements
IInstanceMetricUIDAO
instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
Sum callSum =
instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
- long calls = (long)callSum.getValue();
- int callsPerSec = (int)(secondBetween == 0 ? 0 : calls /
secondBetween);
+ long calls = (long) callSum.getValue();
+ int callsPerSec = (int) (secondBetween == 0 ? 0 : calls /
secondBetween);
AppServerInfo appServerInfo = new AppServerInfo();
appServerInfo.setId(instanceId);
@@ -103,13 +105,15 @@ public class InstanceMetricEsUIDAO extends EsDAO
implements IInstanceMetricUIDAO
}
}
- @Override public List<Integer> getServerTPSTrend(int instanceId, Step
step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServerTPSTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step,
InstanceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId
+ Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> throughputTrend = new LinkedList<>();
@@ -118,8 +122,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements
IInstanceMetricUIDAO
for (int i = 0; i < multiGetResponse.getResponses().length; i++) {
MultiGetItemResponse response = multiGetResponse.getResponses()[i];
if (response.getResponse().isExists()) {
- long callTimes =
((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- throughputTrend.add((int)(callTimes /
durationPoints.get(i).getSecondsBetween()));
+ long callTimes = ((Number)
response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ throughputTrend.add((int) (callTimes /
durationPoints.get(i).getSecondsBetween()));
} else {
throughputTrend.add(0);
}
@@ -127,24 +131,28 @@ public class InstanceMetricEsUIDAO extends EsDAO
implements IInstanceMetricUIDAO
return throughputTrend;
}
- @Override public List<Integer> getResponseTimeTrend(int instanceId, Step
step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getResponseTimeTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step,
InstanceMetricTable.TABLE);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+ }
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId
+ Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
});
+
List<Integer> responseTimeTrends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long callTimes =
((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- long errorCallTimes =
((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
- long durationSum =
((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
- long errorDurationSum =
((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue();
- responseTimeTrends.add((int)((durationSum - errorDurationSum)
/ (callTimes - errorCallTimes)));
+ long callTimes = ((Number)
response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long errorCallTimes = ((Number)
response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+ long durationSum = ((Number)
response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
+ long errorDurationSum = ((Number)
response.getResponse().getSource().get(InstanceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue();
+ responseTimeTrends.add((int) ((durationSum - errorDurationSum)
/ (callTimes - errorCallTimes)));
} else {
responseTimeTrends.add(0);
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
index 6e5e891..fb30adf 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.List;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
import org.apache.skywalking.apm.collector.core.util.Const;
@@ -32,6 +31,8 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
+import java.util.List;
+
/**
* @author peng-yongsheng
*/
@@ -41,38 +42,42 @@ public class MemoryMetricEsUIDAO extends EsDAO implements
IMemoryMetricUIDAO {
super(client);
}
- @Override public Trend getHeapMemoryTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
+ @Override
+ public Trend getHeapMemoryTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, true);
}
- @Override public Trend getNoHeapMemoryTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
+ @Override
+ public Trend getNoHeapMemoryTrend(int instanceId, Step step,
List<DurationPoint> durationPoints) {
return getMemoryTrend(instanceId, step, durationPoints, false);
}
private Trend getMemoryTrend(int instanceId, Step step,
List<DurationPoint> durationPoints,
- boolean isHeap) {
+ boolean isHeap) {
String tableName = TimePyramidTableNameBuilder.build(step,
MemoryMetricTable.TABLE);
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
+ add(tableName, MemoryMetricTable.TABLE_TYPE, id);
+ }
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId
+ Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
- prepareMultiGet.add(tableName, MemoryMetricTable.TABLE_TYPE, id);
});
Trend trend = new Trend();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long max =
((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
- long used =
((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
- long times =
((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
+ long max = ((Number)
response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
+ long used = ((Number)
response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
+ long times = ((Number)
response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
- trend.getMetrics().add((int)(used / times));
+ trend.getMetrics().add((int) (used / times));
if (max < 0) {
- trend.getMaxMetrics().add((int)(used / times));
+ trend.getMaxMetrics().add((int) (used / times));
} else {
- trend.getMaxMetrics().add((int)(max / times));
+ trend.getMaxMetrics().add((int) (max / times));
}
} else {
trend.getMetrics().add(0);
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
index 2201b43..5fdbee5 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.Const;
import
org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
@@ -57,31 +58,6 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO
implements IMemoryPoolMetricU
}
@Override public JsonObject getMetric(int instanceId, long
startTimeBucket, long endTimeBucket, int poolType) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
-
- long timeBucket = startTimeBucket;
- do {
-// timeBucket =
TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND,
timeBucket, 1);
- String id = timeBucket + Const.ID_SPLIT + instanceId +
Const.ID_SPLIT + poolType;
- prepareMultiGet.add(MemoryPoolMetricTable.TABLE,
MemoryPoolMetricTable.TABLE_TYPE, id);
- }
- while (timeBucket <= endTimeBucket);
-
- JsonObject metric = new JsonObject();
- JsonArray usedMetric = new JsonArray();
- MultiGetResponse multiGetResponse = prepareMultiGet.get();
- for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
- if (response.getResponse().isExists()) {
- metric.addProperty("max",
((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
- metric.addProperty("init",
((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
-
usedMetric.add(((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
- } else {
- metric.addProperty("max", 0);
- metric.addProperty("init", 0);
- usedMetric.add(0);
- }
- }
- metric.add("used", usedMetric);
- return metric;
+ throw new UnexpectedException("Not implement method");
}
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
index 96761a1..34ae413 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
@@ -18,11 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
@@ -51,6 +46,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
+import java.util.*;
+
/**
* @author peng-yongsheng
*/
@@ -62,23 +59,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
@Override
public List<Integer> getServiceResponseTimeTrend(int serviceId, Step step,
List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
String tableName = TimePyramidTableNameBuilder.build(step,
ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId
+ Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- long errorCalls =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
- long durationSum =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
- long errorDurationSum =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
- trends.add((int)((durationSum - errorDurationSum) / (calls -
errorCalls)));
+ long calls = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long errorCalls = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+ long durationSum = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
+ long errorDurationSum = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
+ trends.add((int) ((durationSum - errorDurationSum) / (calls -
errorCalls)));
} else {
trends.add(0);
}
@@ -86,13 +84,15 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
return trends;
}
- @Override public List<Integer> getServiceTPSTrend(int serviceId, Step
step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServiceTPSTrend(int serviceId, Step step,
List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step,
ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId
+ Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
@@ -101,9 +101,9 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
int index = 0;
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long calls = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
long secondBetween =
durationPoints.get(index).getSecondsBetween();
- trends.add((int)(calls / secondBetween));
+ trends.add((int) (calls / secondBetween));
} else {
trends.add(0);
}
@@ -112,22 +112,24 @@ public class ServiceMetricEsUIDAO extends EsDAO
implements IServiceMetricUIDAO {
return trends;
}
- @Override public List<Integer> getServiceSLATrend(int serviceId, Step
step, List<DurationPoint> durationPoints) {
- MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+ @Override
+ public List<Integer> getServiceSLATrend(int serviceId, Step step,
List<DurationPoint> durationPoints) {
String tableName = TimePyramidTableNameBuilder.build(step,
ServiceMetricTable.TABLE);
-
- durationPoints.forEach(durationPoint -> {
- String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId
+ Const.ID_SPLIT + MetricSource.Callee.getValue();
- prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ MultiGetRequestBuilder prepareMultiGet =
getClient().prepareMultiGet(durationPoints, new
ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+ @Override
+ public void accept(DurationPoint durationPoint) {
+ String id = durationPoint.getPoint() + Const.ID_SPLIT +
serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+ add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+ }
});
List<Integer> trends = new LinkedList<>();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
- long calls =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
- long errorCalls =
((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
- trends.add((int)(((calls - errorCalls) / calls)) * 10000);
+ long calls = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+ long errorCalls = ((Number)
response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+ trends.add((int) (((calls - errorCalls) / calls)) * 10000);
} else {
trends.add(10000);
}
@@ -137,7 +139,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
@Override
public List<Node> getServicesMetric(Step step, long startTime, long
endTime, MetricSource metricSource,
- Collection<Integer> serviceIds) {
+ Collection<Integer> serviceIds) {
String tableName = TimePyramidTableNameBuilder.build(step,
ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder =
getClient().prepareSearch(tableName);
@@ -169,8 +171,8 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
ServiceNode serviceNode = new ServiceNode();
serviceNode.setId(serviceId);
- serviceNode.setCalls((long)callsSum.getValue());
- serviceNode.setSla((int)(((callsSum.getValue() -
errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
+ serviceNode.setCalls((long) callsSum.getValue());
+ serviceNode.setSla((int) (((callsSum.getValue() -
errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
nodes.add(serviceNode);
});
return nodes;
@@ -178,7 +180,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements
IServiceMetricUIDAO {
@Override
public List<ServiceMetric> getSlowService(int applicationId, Step step,
long startTimeBucket, long endTimeBucket,
- Integer topN, MetricSource metricSource) {
+ Integer topN, MetricSource
metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step,
ServiceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder =
getClient().prepareSearch(tableName);
@@ -202,12 +204,12 @@ public class ServiceMetricEsUIDAO extends EsDAO
implements IServiceMetricUIDAO {
Set<Integer> serviceIds = new HashSet<>();
List<ServiceMetric> serviceMetrics = new LinkedList<>();
for (SearchHit searchHit : searchHits) {
- int serviceId =
((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
+ int serviceId = ((Number)
searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
if (!serviceIds.contains(serviceId)) {
ServiceMetric serviceMetric = new ServiceMetric();
serviceMetric.setId(serviceId);
-
serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
-
serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
+ serviceMetric.setCalls(((Number)
searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
+ serviceMetric.setAvgResponseTime(((Number)
searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
serviceMetrics.add(serviceMetric);
serviceIds.add(serviceId);
--
To stop receiving notification emails like this one, please contact
[email protected].