This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch es/namespace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/es/namespace by this push:
new 4843747 Finish all codes about ES namespace feature.
4843747 is described below
commit 4843747907f2b9d0335e957a0c81d27257f823bd
Author: wusheng <[email protected]>
AuthorDate: Wed Mar 21 16:35:16 2018 +0800
Finish all codes about ES namespace feature.
---
.../client/elasticsearch/ElasticSearchClient.java | 11 --------
.../es/base/dao/AbstractPersistenceEsDAO.java | 8 +++---
.../alarm/ApplicationAlarmEsPersistenceDAO.java | 10 +++----
.../AbstractMemoryPoolMetricEsPersistenceDAO.java | 32 ++++++++++++----------
.../es/dao/register/InstanceRegisterEsDAO.java | 32 +++++++++++-----------
.../storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java | 18 ++++++------
6 files changed, 51 insertions(+), 60 deletions(-)
diff --git
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index 6d2f866..d58b955 100644
---
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -30,7 +30,6 @@ import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.Nullable;
@@ -46,10 +45,8 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
/**
@@ -210,14 +207,6 @@ public class ElasticSearchClient implements Client {
return client.prepareBulk();
}
- public void update(UpdateRequest updateRequest) {
- try {
- client.update(updateRequest).get();
- } catch (InterruptedException | ExecutionException e) {
- logger.error(e.getMessage(), e);
- }
- }
-
private String formatIndexName(String indexName) {
return formatIndexName(this.namespace, indexName);
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index 0c44eca..eb22185 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.base.dao;
-import java.util.Map;
-
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -32,6 +30,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -80,8 +80,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA
extends StreamData> e
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete(
-
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket)
- , tableName())
+
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
+ tableName())
.get();
long deleted = response.getDeleted();
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index b1a4bda..d3d5603 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,9 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
-import java.util.HashMap;
-import java.util.Map;
-
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
@@ -35,6 +32,9 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -100,8 +100,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO
implements IApplicat
long startTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete(
-
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket)
- , ApplicationAlarmTable.TABLE)
+
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+ ApplicationAlarmTable.TABLE)
.get();
long deleted = response.getDeleted();
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 2d10fdf..4741f2e 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
-import java.util.HashMap;
-import java.util.Map;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import
org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import
org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -34,28 +35,31 @@ public abstract class
AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
super(client);
}
- @Override protected final String timeBucketColumnNameForDelete() {
+ @Override
+ protected final String timeBucketColumnNameForDelete() {
return MemoryPoolMetricTable.COLUMN_TIME_BUCKET;
}
- @Override protected final MemoryPoolMetric esDataToStreamData(Map<String,
Object> source) {
+ @Override
+ protected final MemoryPoolMetric esDataToStreamData(Map<String, Object>
source) {
MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric();
-
memoryPoolMetric.setMetricId((String)source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
+ memoryPoolMetric.setMetricId((String)
source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
-
memoryPoolMetric.setInstanceId(((Number)source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
-
memoryPoolMetric.setPoolType(((Number)source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
+ memoryPoolMetric.setInstanceId(((Number)
source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
+ memoryPoolMetric.setPoolType(((Number)
source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
-
memoryPoolMetric.setInit(((Number)source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
-
memoryPoolMetric.setMax(((Number)source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
-
memoryPoolMetric.setUsed(((Number)source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
-
memoryPoolMetric.setCommitted(((Number)source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
-
memoryPoolMetric.setTimes(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
+ memoryPoolMetric.setInit(((Number)
source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
+ memoryPoolMetric.setMax(((Number)
source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
+ memoryPoolMetric.setUsed(((Number)
source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
+ memoryPoolMetric.setCommitted(((Number)
source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
+ memoryPoolMetric.setTimes(((Number)
source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
-
memoryPoolMetric.setTimeBucket(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
+ memoryPoolMetric.setTimeBucket(((Number)
source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
return memoryPoolMetric;
}
- @Override protected final Map<String, Object>
esStreamDataToEsData(MemoryPoolMetric streamData) {
+ @Override
+ protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric
streamData) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_METRIC_ID,
streamData.getMetricId());
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
index 074ec58..93a75dc 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.register;
-import java.util.HashMap;
-import java.util.Map;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import
org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
@@ -28,10 +26,13 @@ import
org.apache.skywalking.apm.collector.storage.table.register.Instance;
import
org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -43,15 +44,18 @@ public class InstanceRegisterEsDAO extends EsDAO implements
IInstanceRegisterDAO
super(client);
}
- @Override public int getMaxInstanceId() {
+ @Override
+ public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
- @Override public int getMinInstanceId() {
+ @Override
+ public int getMinInstanceId() {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
- @Override public void save(Instance instance) {
+ @Override
+ public void save(Instance instance) {
logger.debug("save instance register info, application
getApplicationId: {}, agentUUID: {}", instance.getApplicationId(),
instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
@@ -69,18 +73,14 @@ public class InstanceRegisterEsDAO extends EsDAO implements
IInstanceRegisterDAO
logger.debug("save instance register info, application
getApplicationId: {}, agentUUID: {}, status: {}", instance.getApplicationId(),
instance.getAgentUUID(), response.status().name());
}
- @Override public void updateHeartbeatTime(int instanceId, long
heartbeatTime) {
- ElasticSearchClient client = getClient();
- UpdateRequest updateRequest = new UpdateRequest();
- updateRequest.index(InstanceTable.TABLE);
- updateRequest.type(InstanceTable.TABLE_TYPE);
- updateRequest.id(String.valueOf(instanceId));
- updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
+ @Override
+ public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
+ UpdateRequestBuilder updateRequestBuilder =
getClient().prepareUpdate(InstanceTable.TABLE, String.valueOf(instanceId));
+
updateRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME,
TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartbeatTime));
+ updateRequestBuilder.setDoc(source);
- updateRequest.doc(source);
- client.update(updateRequest);
+ updateRequestBuilder.get();
}
}
diff --git
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
index 5fdbee5..5f769b7 100644
---
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
+++
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
@@ -27,9 +26,6 @@ import
org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import
org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
/**
* @author peng-yongsheng
@@ -40,15 +36,16 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO
implements IMemoryPoolMetricU
super(client);
}
- @Override public JsonObject getMetric(int instanceId, long timeBucket, int
poolType) {
+ @Override
+ public JsonObject getMetric(int instanceId, long timeBucket, int poolType)
{
String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT
+ poolType;
GetResponse getResponse =
getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get();
JsonObject metric = new JsonObject();
if (getResponse.isExists()) {
- metric.addProperty("max",
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
- metric.addProperty("init",
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
- metric.addProperty("used",
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
+ metric.addProperty("max", ((Number)
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
+ metric.addProperty("init", ((Number)
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
+ metric.addProperty("used", ((Number)
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
} else {
metric.addProperty("max", 0);
metric.addProperty("init", 0);
@@ -57,7 +54,8 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements
IMemoryPoolMetricU
return metric;
}
- @Override public JsonObject getMetric(int instanceId, long
startTimeBucket, long endTimeBucket, int poolType) {
- throw new UnexpectedException("Not implement method");
+ @Override
+ public JsonObject getMetric(int instanceId, long startTimeBucket, long
endTimeBucket, int poolType) {
+ throw new UnexpectedException("Not implement methodø");
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].