This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch es/namespace
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/es/namespace by this push:
     new 4843747  Finish all codes about ES namespace feature.
4843747 is described below

commit 4843747907f2b9d0335e957a0c81d27257f823bd
Author: wusheng <[email protected]>
AuthorDate: Wed Mar 21 16:35:16 2018 +0800

    Finish all codes about ES namespace feature.
---
 .../client/elasticsearch/ElasticSearchClient.java  | 11 --------
 .../es/base/dao/AbstractPersistenceEsDAO.java      |  8 +++---
 .../alarm/ApplicationAlarmEsPersistenceDAO.java    | 10 +++----
 .../AbstractMemoryPoolMetricEsPersistenceDAO.java  | 32 ++++++++++++----------
 .../es/dao/register/InstanceRegisterEsDAO.java     | 32 +++++++++++-----------
 .../storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java | 18 ++++++------
 6 files changed, 51 insertions(+), 60 deletions(-)

diff --git 
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
 
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index 6d2f866..d58b955 100644
--- 
a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ 
b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -30,7 +30,6 @@ import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.MultiGetRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.common.Nullable;
@@ -46,10 +45,8 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 
 /**
@@ -210,14 +207,6 @@ public class ElasticSearchClient implements Client {
         return client.prepareBulk();
     }
 
-    public void update(UpdateRequest updateRequest) {
-        try {
-            client.update(updateRequest).get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
     private String formatIndexName(String indexName) {
         return formatIndexName(this.namespace, indexName);
     }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index 0c44eca..eb22185 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.base.dao;
 
-import java.util.Map;
-
 import 
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -32,6 +30,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -80,8 +80,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA 
extends StreamData> e
         long startTimeBucket = 
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = 
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
         BulkByScrollResponse response = getClient().prepareDelete(
-                
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket)
-                , tableName())
+                
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
+                tableName())
                 .get();
 
         long deleted = response.getDeleted();
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index b1a4bda..d3d5603 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,9 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import 
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import 
org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
@@ -35,6 +32,9 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -100,8 +100,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO 
implements IApplicat
         long startTimeBucket = 
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = 
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
         BulkByScrollResponse response = getClient().prepareDelete(
-                
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket)
-                , ApplicationAlarmTable.TABLE)
+                
QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ApplicationAlarmTable.TABLE)
                 .get();
 
         long deleted = response.getDeleted();
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 2d10fdf..4741f2e 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
 
-import java.util.HashMap;
-import java.util.Map;
 import 
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import 
org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
 import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
 import 
org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -34,28 +35,31 @@ public abstract class 
AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
         super(client);
     }
 
-    @Override protected final String timeBucketColumnNameForDelete() {
+    @Override
+    protected final String timeBucketColumnNameForDelete() {
         return MemoryPoolMetricTable.COLUMN_TIME_BUCKET;
     }
 
-    @Override protected final MemoryPoolMetric esDataToStreamData(Map<String, 
Object> source) {
+    @Override
+    protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> 
source) {
         MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric();
-        
memoryPoolMetric.setMetricId((String)source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
+        memoryPoolMetric.setMetricId((String) 
source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
 
-        
memoryPoolMetric.setInstanceId(((Number)source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
-        
memoryPoolMetric.setPoolType(((Number)source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
+        memoryPoolMetric.setInstanceId(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
+        memoryPoolMetric.setPoolType(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
 
-        
memoryPoolMetric.setInit(((Number)source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
-        
memoryPoolMetric.setMax(((Number)source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
-        
memoryPoolMetric.setUsed(((Number)source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
-        
memoryPoolMetric.setCommitted(((Number)source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
-        
memoryPoolMetric.setTimes(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
+        memoryPoolMetric.setInit(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
+        memoryPoolMetric.setMax(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
+        memoryPoolMetric.setUsed(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
+        memoryPoolMetric.setCommitted(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
+        memoryPoolMetric.setTimes(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
 
-        
memoryPoolMetric.setTimeBucket(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
+        memoryPoolMetric.setTimeBucket(((Number) 
source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
         return memoryPoolMetric;
     }
 
-    @Override protected final Map<String, Object> 
esStreamDataToEsData(MemoryPoolMetric streamData) {
+    @Override
+    protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric 
streamData) {
         Map<String, Object> source = new HashMap<>();
         source.put(MemoryPoolMetricTable.COLUMN_METRIC_ID, 
streamData.getMetricId());
 
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
index 074ec58..93a75dc 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.register;
 
-import java.util.HashMap;
-import java.util.Map;
 import 
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import 
org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
@@ -28,10 +26,13 @@ import 
org.apache.skywalking.apm.collector.storage.table.register.Instance;
 import 
org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -43,15 +44,18 @@ public class InstanceRegisterEsDAO extends EsDAO implements 
IInstanceRegisterDAO
         super(client);
     }
 
-    @Override public int getMaxInstanceId() {
+    @Override
+    public int getMaxInstanceId() {
         return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
     }
 
-    @Override public int getMinInstanceId() {
+    @Override
+    public int getMinInstanceId() {
         return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
     }
 
-    @Override public void save(Instance instance) {
+    @Override
+    public void save(Instance instance) {
         logger.debug("save instance register info, application 
getApplicationId: {}, agentUUID: {}", instance.getApplicationId(), 
instance.getAgentUUID());
         ElasticSearchClient client = getClient();
         Map<String, Object> source = new HashMap<>();
@@ -69,18 +73,14 @@ public class InstanceRegisterEsDAO extends EsDAO implements 
IInstanceRegisterDAO
         logger.debug("save instance register info, application 
getApplicationId: {}, agentUUID: {}, status: {}", instance.getApplicationId(), 
instance.getAgentUUID(), response.status().name());
     }
 
-    @Override public void updateHeartbeatTime(int instanceId, long 
heartbeatTime) {
-        ElasticSearchClient client = getClient();
-        UpdateRequest updateRequest = new UpdateRequest();
-        updateRequest.index(InstanceTable.TABLE);
-        updateRequest.type(InstanceTable.TABLE_TYPE);
-        updateRequest.id(String.valueOf(instanceId));
-        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
+    @Override
+    public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
+        UpdateRequestBuilder updateRequestBuilder = 
getClient().prepareUpdate(InstanceTable.TABLE, String.valueOf(instanceId));
+        
updateRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, 
TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartbeatTime));
+        updateRequestBuilder.setDoc(source);
 
-        updateRequest.doc(source);
-        client.update(updateRequest);
+        updateRequestBuilder.get();
     }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
index 5fdbee5..5f769b7 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import 
org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.UnexpectedException;
@@ -27,9 +26,6 @@ import 
org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO
 import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
 import 
org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
 
 /**
  * @author peng-yongsheng
@@ -40,15 +36,16 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO 
implements IMemoryPoolMetricU
         super(client);
     }
 
-    @Override public JsonObject getMetric(int instanceId, long timeBucket, int 
poolType) {
+    @Override
+    public JsonObject getMetric(int instanceId, long timeBucket, int poolType) 
{
         String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT 
+ poolType;
         GetResponse getResponse = 
getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get();
 
         JsonObject metric = new JsonObject();
         if (getResponse.isExists()) {
-            metric.addProperty("max", 
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
-            metric.addProperty("init", 
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
-            metric.addProperty("used", 
((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
+            metric.addProperty("max", ((Number) 
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
+            metric.addProperty("init", ((Number) 
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
+            metric.addProperty("used", ((Number) 
getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
         } else {
             metric.addProperty("max", 0);
             metric.addProperty("init", 0);
@@ -57,7 +54,8 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements 
IMemoryPoolMetricU
         return metric;
     }
 
-    @Override public JsonObject getMetric(int instanceId, long 
startTimeBucket, long endTimeBucket, int poolType) {
-        throw new UnexpectedException("Not implement method");
+    @Override
+    public JsonObject getMetric(int instanceId, long startTimeBucket, long 
endTimeBucket, int poolType) {
+        throw new UnexpectedException("Not implement methodø");
     }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to