This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/6.0 by this push:
new a5ad06c Refactor register and analysis modules. (#1539)
a5ad06c is described below
commit a5ad06ce46feca3585bae12e5b82be7095fe021f
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Tue Aug 14 14:39:03 2018 +0800
Refactor register and analysis modules. (#1539)
* Refactor register and analysis modules.
* Fixed the startup error.
---
...ntLatencyAvgAggregateWorker.java => Const.java} | 31 +++---
.../skywalking/oap/server/core/CoreModule.java | 2 -
.../oap/server/core/CoreModuleProvider.java | 14 +--
.../server/core/analysis/DispatcherManager.java | 5 +-
.../core/analysis/endpoint/EndpointDispatcher.java | 18 +--
.../endpoint/EndpointLatencyAvgIndicator.java | 60 +++++-----
.../endpoint/EndpointLatencyAvgRemoteWorker.java | 43 -------
.../core/analysis/indicator/AvgIndicator.java | 3 +-
.../server/core/analysis/indicator/Indicator.java | 10 +-
.../indicator/annotation/IndicatorOperator.java} | 4 +-
.../indicator/annotation/IndicatorType.java | 6 +-
.../annotation/IndicatorTypeListener.java} | 23 ++--
...orWorker.java => IndicatorAggregateWorker.java} | 71 +++++-------
...tWorker.java => IndicatorPersistentWorker.java} | 59 +++++-----
.../core/analysis/worker/IndicatorProcess.java | 64 +++++++++++
...emoteWorker.java => IndicatorRemoteWorker.java} | 32 ++----
.../server/core/cache/EndpointCacheService.java | 95 ++++++++++++++++
.../RegisterSource.java} | 28 ++---
.../server/core/register/endpoint/Endpoint.java | 124 +++++++++++++++++++++
.../register/worker/RegisterDistinctWorker.java | 99 ++++++++++++++++
.../register/worker/RegisterPersistentWorker.java | 81 ++++++++++++++
.../worker/RegisterRemoteWorker.java} | 36 ++----
.../server/core/remote/RemoteServiceHandler.java | 8 +-
.../core/remote/client/RemoteClientManager.java | 4 +-
.../core/remote/client/SelfRemoteClient.java | 11 +-
.../oap/server/core/source/SourceReceiverImpl.java | 5 +-
.../{IPersistenceDAO.java => IIndicatorDAO.java} | 10 +-
.../{IPersistenceDAO.java => IRegisterDAO.java} | 12 +-
.../Worker.java => storage/StorageBuilder.java} | 12 +-
.../StorageDAO.java} | 9 +-
.../StorageData.java} | 7 +-
.../oap/server/core/storage/StorageModule.java | 2 +-
.../oap/server/core/storage/annotation/Column.java | 6 +-
.../annotation/{StorageEntity.java => Query.java} | 8 +-
.../annotation/StorageAnnotationListener.java | 4 +-
.../core/storage/annotation/StorageEntity.java | 3 +
...lumn.java => StorageEntityAnnotationUtils.java} | 26 ++++-
.../cache/IEndpointCacheDAO.java} | 13 ++-
.../oap/server/core/worker/AbstractWorker.java | 8 ++
...{AbstractWorker.java => WorkerIdGenerator.java} | 9 +-
.../{AbstractWorker.java => WorkerInstances.java} | 15 ++-
.../annotation/WorkerAnnotationContainer.java | 84 --------------
.../indicator/define/TestAvgIndicator.java | 15 +--
.../test/resources/META-INF/defines/indicator.def | 19 ----
.../src/test/resources/META-INF/defines/worker.def | 17 ---
.../StorageModuleElasticsearchProvider.java | 2 +-
.../storage/plugin/elasticsearch/base/EsDAO.java | 39 -------
.../{PersistenceEsDAO.java => IndicatorEsDAO.java} | 35 +++---
.../plugin/elasticsearch/base/RegisterEsDAO.java | 99 ++++++++++++++++
.../plugin/elasticsearch/base/StorageEsDAO.java} | 19 ++--
.../elasticsearch/cache/EndpointCacheEsDAO.java | 53 +++++++++
51 files changed, 908 insertions(+), 554 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
similarity index 51%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index d5388c3..55779f9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -16,23 +16,24 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
-
-import
org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+package org.apache.skywalking.oap.server.core;
/**
* @author peng-yongsheng
*/
-@Worker
-public class EndpointLatencyAvgAggregateWorker extends
AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
-
- public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
- super(moduleManager);
- }
-
- @Override public Class nextWorkerClass() {
- return EndpointLatencyAvgRemoteWorker.class;
- }
+public class Const {
+ public static final int NONE = 0;
+ public static final String ID_SPLIT = "_";
+ public static final int NONE_APPLICATION_ID = 1;
+ public static final int NONE_INSTANCE_ID = 1;
+ public static final int NONE_SERVICE_ID = 1;
+ public static final String NONE_SERVICE_NAME = "None";
+ public static final String USER_CODE = "User";
+ public static final String SEGMENT_SPAN_SPLIT = "S";
+ public static final String UNKNOWN = "Unknown";
+ public static final String EXCEPTION = "Exception";
+ public static final String EMPTY_STRING = "";
+ public static final String FILE_SUFFIX = "sw";
+ public static final int SPAN_TYPE_VIRTUAL = 9;
+ public static final String DOMAIN_OPERATION_NAME = "{domain}";
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 2b0860a..27d1da6 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -25,7 +25,6 @@ import
org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
@@ -56,7 +55,6 @@ public class CoreModule extends ModuleDefine {
private void addInsideService(List<Class> classes) {
classes.add(IModelGetter.class);
classes.add(StreamDataClassGetter.class);
- classes.add(WorkerAnnotationContainer.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9b428f8..8e3d137 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core;
import java.io.IOException;
+import
org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.*;
@@ -28,7 +29,6 @@ import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import
org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.worker.annotation.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
@@ -48,9 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
private final AnnotationScan annotationScan;
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
- private final WorkerAnnotationListener workerAnnotationListener;
private final StreamDataAnnotationContainer streamDataAnnotationContainer;
- private final WorkerAnnotationContainer workerAnnotationContainer;
public CoreModuleProvider() {
super();
@@ -58,9 +56,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.annotationScan = new AnnotationScan();
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
- this.workerAnnotationListener = new WorkerAnnotationListener();
this.streamDataAnnotationContainer = new
StreamDataAnnotationContainer();
- this.workerAnnotationContainer = new WorkerAnnotationContainer();
}
@Override public String name() {
@@ -85,10 +81,9 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(GRPCHandlerRegister.class, new
GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new
JettyHandlerRegisterImpl(jettyServer));
- this.registerServiceImplementation(SourceReceiver.class, new
SourceReceiverImpl(getManager()));
+ this.registerServiceImplementation(SourceReceiver.class, new
SourceReceiverImpl());
this.registerServiceImplementation(StreamDataClassGetter.class,
streamDataAnnotationContainer);
- this.registerServiceImplementation(WorkerAnnotationContainer.class,
workerAnnotationContainer);
this.registerServiceImplementation(RemoteClientManager.class, new
RemoteClientManager(getManager()));
this.registerServiceImplementation(RemoteSenderService.class, new
RemoteSenderService(getManager()));
@@ -96,7 +91,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
- annotationScan.registerListener(workerAnnotationListener);
+ annotationScan.registerListener(new
IndicatorTypeListener(getManager()));
}
@Override public void start() throws ModuleStartException {
@@ -105,9 +100,8 @@ public class CoreModuleProvider extends ModuleProvider {
try {
annotationScan.scan(() -> {
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
- workerAnnotationContainer.load(getManager(),
workerAnnotationListener.getWorkerClasses());
});
- } catch (WorkerDefineLoadException | IOException e) {
+ } catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 3dea269..4c0ab0f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.core.analysis;
import java.util.*;
import
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.source.Scope;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
@@ -33,9 +32,9 @@ public class DispatcherManager {
private Map<Scope, SourceDispatcher> dispatcherMap;
- public DispatcherManager(ModuleManager moduleManager) {
+ public DispatcherManager() {
this.dispatcherMap = new HashMap<>();
- this.dispatcherMap.put(Scope.Endpoint, new
EndpointDispatcher(moduleManager));
+ this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
}
public SourceDispatcher getDispatcher(Scope scope) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index b7aef09..9bf8838 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,38 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
- private final ModuleManager moduleManager;
- private EndpointLatencyAvgAggregateWorker avgAggregator;
-
- public EndpointDispatcher(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
- }
-
@Override public void dispatch(Endpoint source) {
avg(source);
}
private void avg(Endpoint source) {
- if (avgAggregator == null) {
- WorkerAnnotationContainer workerMapper =
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
- avgAggregator =
(EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
- }
-
EndpointLatencyAvgIndicator indicator = new
EndpointLatencyAvgIndicator();
indicator.setId(source.getId());
indicator.setTimeBucket(source.getTimeBucket());
indicator.combine(source.getLatency(), 1);
- avgAggregator.in(indicator);
+ IndicatorProcess.INSTANCE.in(indicator);
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index 6872acb..718d357 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -20,20 +20,21 @@ package
org.apache.skywalking.oap.server.core.analysis.endpoint;
import java.util.*;
import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import
org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* @author peng-yongsheng
*/
+@IndicatorType
@StreamData
-@StorageEntity(name = EndpointLatencyAvgIndicator.NAME)
+@StorageEntity(name = "endpoint_latency_avg", builder =
EndpointLatencyAvgIndicator.Builder.class)
public class EndpointLatencyAvgIndicator extends AvgIndicator {
- public static final String NAME = "endpoint_latency_avg";
-
private static final String ID = "id";
private static final String SERVICE_ID = "service_id";
private static final String SERVICE_INSTANCE_ID = "service_instance_id";
@@ -42,10 +43,6 @@ public class EndpointLatencyAvgIndicator extends
AvgIndicator {
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int
serviceInstanceId;
- @Override public String name() {
- return NAME;
- }
-
@Override public String id() {
return String.valueOf(id);
}
@@ -99,27 +96,30 @@ public class EndpointLatencyAvgIndicator extends
AvgIndicator {
setValue(remoteData.getDataLongs(2));
}
- @Override public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<>();
- map.put(ID, id);
- map.put(SERVICE_ID, serviceId);
- map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
- map.put(COUNT, getCount());
- map.put(SUMMATION, getSummation());
- map.put(VALUE, getValue());
- map.put(TIME_BUCKET, getTimeBucket());
- return map;
- }
-
- @Override public Indicator newOne(Map<String, Object> dbMap) {
- EndpointLatencyAvgIndicator indicator = new
EndpointLatencyAvgIndicator();
- indicator.setId((Integer)dbMap.get(ID));
- indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
-
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
- indicator.setCount((Integer)dbMap.get(COUNT));
- indicator.setSummation((Long)dbMap.get(SUMMATION));
- indicator.setValue((Long)dbMap.get(VALUE));
- indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
- return indicator;
+ public static class Builder implements
StorageBuilder<EndpointLatencyAvgIndicator> {
+
+ @Override public EndpointLatencyAvgIndicator map2Data(Map<String,
Object> dbMap) {
+ EndpointLatencyAvgIndicator indicator = new
EndpointLatencyAvgIndicator();
+ indicator.setId((Integer)dbMap.get(ID));
+ indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
+
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
+ indicator.setCount((Integer)dbMap.get(COUNT));
+ indicator.setSummation((Long)dbMap.get(SUMMATION));
+ indicator.setValue((Long)dbMap.get(VALUE));
+ indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
+ return indicator;
+ }
+
+ @Override public Map<String, Object>
data2Map(EndpointLatencyAvgIndicator storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(ID, storageData.getId());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+ map.put(COUNT, storageData.getCount());
+ map.put(SUMMATION, storageData.getSummation());
+ map.put(VALUE, storageData.getValue());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ return map;
+ }
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
deleted file mode 100644
index f36483a..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
-
-import
org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
-/**
- * @author peng-yongsheng
- */
-@Worker
-public class EndpointLatencyAvgRemoteWorker extends
AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
-
- public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
- super(moduleManager);
- }
-
- @Override public Selector selector() {
- return Selector.HashCode;
- }
-
- @Override public Class nextWorkerClass() {
- return EndpointLatencyAvgPersistentWorker.class;
- }
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 1bae8ec..41f24c8 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -20,13 +20,12 @@ package
org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
-@IndicatorType(selector = Selector.HashCode, needMerge = true)
+@IndicatorOperator
public abstract class AvgIndicator extends Indicator {
protected static final String SUMMATION = "summation";
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 439dee5..fe29ed6 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,15 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
-import java.util.Map;
import lombok.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
-public abstract class Indicator extends StreamData {
+public abstract class Indicator extends StreamData implements StorageData {
protected static final String TIME_BUCKET = "time_bucket";
@@ -35,10 +35,4 @@ public abstract class Indicator extends StreamData {
public abstract String id();
public abstract void combine(Indicator indicator);
-
- public abstract String name();
-
- public abstract Map<String, Object> toMap();
-
- public abstract Indicator newOne(Map<String, Object> dbMap);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
similarity index 89%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
index bc4c495..60ee119 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
@@ -25,5 +25,5 @@ import java.lang.annotation.*;
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-public @interface Worker {
+public @interface IndicatorOperator {
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index d1ad273..5543257 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,15 +19,11 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.RUNTIME)
public @interface IndicatorType {
- Selector selector();
-
- boolean needMerge();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
similarity index 64%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
index 51f5875..bffcbbf 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
@@ -16,34 +16,29 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.Annotation;
-import java.util.*;
-import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class WorkerAnnotationListener implements AnnotationListener {
+public class IndicatorTypeListener implements AnnotationListener {
- private static final Logger logger =
LoggerFactory.getLogger(WorkerAnnotationListener.class);
+ private final ModuleManager moduleManager;
- @Getter private final List<Class> workerClasses;
-
- public WorkerAnnotationListener() {
- this.workerClasses = new LinkedList<>();
+ public IndicatorTypeListener(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
- return Worker.class;
+ return IndicatorType.class;
}
@Override public void notify(Class aClass) {
- logger.info("The owner class of worker annotation, class name: {}",
aClass.getName());
-
- workerClasses.add(aClass);
+ IndicatorProcess.INSTANCE.create(moduleManager, aClass);
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
similarity index 54%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 3e420c9..8d0bf88 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,44 +21,41 @@ package
org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractAggregatorWorker<INPUT extends Indicator>
extends AbstractWorker<INPUT> {
+public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractAggregatorWorker.class);
+ private static final Logger logger =
LoggerFactory.getLogger(IndicatorAggregateWorker.class);
- private AbstractWorker worker;
- private final ModuleManager moduleManager;
- private final DataCarrier<INPUT> dataCarrier;
- private final MergeDataCache<INPUT> mergeDataCache;
+ private AbstractWorker<Indicator> nextWorker;
+ private final DataCarrier<Indicator> dataCarrier;
+ private final MergeDataCache<Indicator> mergeDataCache;
private int messageNum;
- public AbstractAggregatorWorker(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator>
nextWorker) {
+ super(workerId);
+ this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
}
- @Override public final void in(INPUT input) {
- input.setEndOfBatchContext(new EndOfBatchContext(false));
- dataCarrier.produce(input);
+ @Override public final void in(Indicator indicator) {
+ indicator.setEndOfBatchContext(new EndOfBatchContext(false));
+ dataCarrier.produce(indicator);
}
- private void onWork(INPUT message) {
+ private void onWork(Indicator indicator) {
messageNum++;
- aggregate(message);
+ aggregate(indicator);
- if (messageNum >= 1000 ||
message.getEndOfBatchContext().isEndOfBatch()) {
+ if (messageNum >= 1000 ||
indicator.getEndOfBatchContext().isEndOfBatch()) {
sendToNext();
messageNum = 0;
}
@@ -74,41 +71,31 @@ public abstract class AbstractAggregatorWorker<INPUT
extends Indicator> extends
}
}
- mergeDataCache.getLast().collection().forEach((INPUT key, INPUT data)
-> {
+ mergeDataCache.getLast().collection().forEach((Indicator key,
Indicator data) -> {
if (logger.isDebugEnabled()) {
logger.debug(data.toString());
}
- onNext(data);
+ nextWorker.in(data);
});
mergeDataCache.finishReadingLast();
}
- private void onNext(INPUT data) {
- if (worker == null) {
- WorkerAnnotationContainer workerMapper =
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
- worker = workerMapper.findInstanceByClass(nextWorkerClass());
- }
- worker.in(data);
- }
-
- public abstract Class nextWorkerClass();
-
- private void aggregate(INPUT message) {
+ private void aggregate(Indicator indicator) {
mergeDataCache.writing();
- if (mergeDataCache.containsKey(message)) {
- mergeDataCache.get(message).combine(message);
+ if (mergeDataCache.containsKey(indicator)) {
+ mergeDataCache.get(indicator).combine(indicator);
} else {
- mergeDataCache.put(message);
+ mergeDataCache.put(indicator);
}
mergeDataCache.finishWriting();
}
- private class AggregatorConsumer implements IConsumer<INPUT> {
+ private class AggregatorConsumer implements IConsumer<Indicator> {
- private final AbstractAggregatorWorker<INPUT> aggregator;
+ private final IndicatorAggregateWorker aggregator;
- private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator)
{
+ private AggregatorConsumer(IndicatorAggregateWorker aggregator) {
this.aggregator = aggregator;
}
@@ -116,21 +103,21 @@ public abstract class AbstractAggregatorWorker<INPUT
extends Indicator> extends
}
- @Override public void consume(List<INPUT> data) {
- Iterator<INPUT> inputIterator = data.iterator();
+ @Override public void consume(List<Indicator> data) {
+ Iterator<Indicator> inputIterator = data.iterator();
int i = 0;
while (inputIterator.hasNext()) {
- INPUT input = inputIterator.next();
+ Indicator indicator = inputIterator.next();
i++;
if (i == data.size()) {
- input.getEndOfBatchContext().setEndOfBatch(true);
+ indicator.getEndOfBatchContext().setEndOfBatch(true);
}
- aggregator.onWork(input);
+ aggregator.onWork(indicator);
}
}
- @Override public void onError(List<INPUT> data, Throwable t) {
+ @Override public void onError(List<Indicator> data, Throwable t) {
logger.error(t.getMessage(), t);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
similarity index 68%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index a8bee9f..dd140c8 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -31,26 +31,31 @@ import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractPersistentWorker<INPUT extends Indicator>
extends AbstractWorker<INPUT> {
+public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractPersistentWorker.class);
+ private static final Logger logger =
LoggerFactory.getLogger(IndicatorPersistentWorker.class);
- private final MergeDataCache<INPUT> mergeDataCache;
+ private final String modelName;
+ private final MergeDataCache<Indicator> mergeDataCache;
private final IBatchDAO batchDAO;
- private final IPersistenceDAO<?, ?, INPUT> persistenceDAO;
- private final int blockBatchPersistenceSize = 1000;
-
- public AbstractPersistentWorker(ModuleManager moduleManager) {
+ private final IIndicatorDAO indicatorDAO;
+ private final int blockBatchPersistenceSize;
+
+ IndicatorPersistentWorker(int workerId, String modelName, int batchSize,
ModuleManager moduleManager,
+ IIndicatorDAO indicatorDAO) {
+ super(workerId);
+ this.modelName = modelName;
+ this.blockBatchPersistenceSize = batchSize;
this.mergeDataCache = new MergeDataCache<>();
this.batchDAO =
moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
- this.persistenceDAO =
moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
+ this.indicatorDAO = indicatorDAO;
}
- public final Window<MergeDataCollection<INPUT>> getCache() {
+ public final Window<MergeDataCollection<Indicator>> getCache() {
return mergeDataCache;
}
- @Override public final void in(INPUT input) {
+ @Override public final void in(Indicator input) {
if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
try {
if (getCache().trySwitchPointer()) {
@@ -86,33 +91,25 @@ public abstract class AbstractPersistentWorker<INPUT
extends Indicator> extends
return batchCollection;
}
- private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) {
+ private List<Object> prepareBatch(MergeDataCollection<Indicator>
collection) {
List<Object> batchCollection = new LinkedList<>();
collection.collection().forEach((id, data) -> {
- if (needMergeDBData()) {
- INPUT dbData = null;
+ Indicator dbData = null;
+ try {
+ dbData = indicatorDAO.get(modelName, data);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ if (nonNull(dbData)) {
+ dbData.combine(data);
try {
- dbData = persistenceDAO.get(data);
+
batchCollection.add(indicatorDAO.prepareBatchUpdate(modelName, dbData));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
- if (nonNull(dbData)) {
- dbData.combine(data);
- try {
-
batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData));
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- } else {
- try {
-
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- }
} else {
try {
-
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+
batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
@@ -122,7 +119,7 @@ public abstract class AbstractPersistentWorker<INPUT
extends Indicator> extends
return batchCollection;
}
- private void cacheData(INPUT input) {
+ private void cacheData(Indicator input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
mergeDataCache.get(input).combine(input);
@@ -132,6 +129,4 @@ public abstract class AbstractPersistentWorker<INPUT
extends Indicator> extends
mergeDataCache.finishWriting();
}
-
- protected abstract boolean needMergeDBData();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
new file mode 100644
index 0000000..398d61d
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
+import
org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum IndicatorProcess {
+ INSTANCE;
+
+ private Map<Class<? extends Indicator>, IndicatorAggregateWorker>
entryWorkers = new HashMap<>();
+
+ public void in(Indicator indicator) {
+ entryWorkers.get(indicator.getClass()).in(indicator);
+ }
+
+ public void create(ModuleManager moduleManager, Class<? extends Indicator>
indicatorClass) {
+ String modelName =
StorageEntityAnnotationUtils.getModelName(indicatorClass);
+ Class<? extends StorageBuilder> builderClass =
StorageEntityAnnotationUtils.getBuilder(indicatorClass);
+
+ StorageDAO storageDAO =
moduleManager.find(StorageModule.NAME).getService(StorageDAO.class);
+ IIndicatorDAO indicatorDAO;
+ try {
+ indicatorDAO =
storageDAO.newIndicatorDao(builderClass.newInstance());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new UnexpectedException("");
+ }
+
+ IndicatorPersistentWorker persistentWorker = new
IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO);
+ WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(),
persistentWorker);
+
+ IndicatorRemoteWorker remoteWorker = new
IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager,
persistentWorker);
+ WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(),
remoteWorker);
+
+ IndicatorAggregateWorker aggregateWorker = new
IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker);
+ WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(),
aggregateWorker);
+
+ entryWorkers.put(indicatorClass, aggregateWorker);
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
similarity index 59%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
index fc9b48d..0295864 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -30,35 +29,24 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends
AbstractWorker<INPUT> {
+public class IndicatorRemoteWorker extends AbstractWorker<Indicator> {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractRemoteWorker.class);
+ private static final Logger logger =
LoggerFactory.getLogger(IndicatorRemoteWorker.class);
- private final ModuleManager moduleManager;
- private RemoteSenderService remoteSender;
- private WorkerAnnotationContainer workerMapper;
+ private final AbstractWorker<Indicator> nextWorker;
+ private final RemoteSenderService remoteSender;
- public AbstractRemoteWorker(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ IndicatorRemoteWorker(int workerId, ModuleManager moduleManager,
AbstractWorker<Indicator> nextWorker) {
+ super(workerId);
+ this.remoteSender =
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+ this.nextWorker = nextWorker;
}
- @Override public final void in(INPUT input) {
- if (remoteSender == null) {
- remoteSender =
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
- }
- if (workerMapper == null) {
- workerMapper =
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
- }
-
+ @Override public final void in(Indicator indicator) {
try {
- int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
- remoteSender.send(nextWorkerId, input, selector());
+ remoteSender.send(nextWorker.getWorkerId(), indicator,
Selector.HashCode);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
-
- public abstract Class nextWorkerClass();
-
- public abstract Selector selector();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
new file mode 100644
index 0000000..71e5814
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.cache;
+
+import com.google.common.cache.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.cache.IEndpointCacheDAO;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+import static java.util.Objects.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointCacheService implements Service {
+
+ private static final Logger logger =
LoggerFactory.getLogger(EndpointCacheService.class);
+
+ private final ModuleManager moduleManager;
+ private IEndpointCacheDAO cacheDAO;
+
+ public EndpointCacheService(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
+ }
+
+ private final Cache<String, Integer> idCache =
CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(1000000).build();
+
+ private final Cache<Integer, Endpoint> sequenceCache =
CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(1000000).build();
+
+ public int get(int serviceId, String serviceName, int srcSpanType) {
+ String id = serviceId + Const.ID_SPLIT + serviceName + Const.ID_SPLIT
+ srcSpanType;
+
+ int endpointId = 0;
+
+ try {
+ endpointId = idCache.get(id, () -> getCacheDAO().get(id));
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ if (serviceId == 0) {
+ endpointId = getCacheDAO().get(id);
+ if (endpointId != 0) {
+ idCache.put(id, endpointId);
+ }
+ }
+ return endpointId;
+ }
+
+ public Endpoint get(int endpointId) {
+ Endpoint endpoint = null;
+ try {
+ endpoint = sequenceCache.get(endpointId, () ->
getCacheDAO().get(endpointId));
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ if (isNull(endpoint)) {
+ endpoint = getCacheDAO().get(endpointId);
+ if (nonNull(endpoint)) {
+ sequenceCache.put(endpointId, endpoint);
+ } else {
+ logger.warn("Endpoint id {} is not in cache and persistent
storage.", endpointId);
+ }
+ }
+
+ return endpoint;
+ }
+
+ private IEndpointCacheDAO getCacheDAO() {
+ if (isNull(cacheDAO)) {
+ cacheDAO =
moduleManager.find(StorageModule.NAME).getService(IEndpointCacheDAO.class);
+ }
+ return cacheDAO;
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
similarity index 54%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
index 439dee5..abb6c9e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
@@ -16,29 +16,29 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.register;
-import java.util.Map;
import lombok.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
-public abstract class Indicator extends StreamData {
+public abstract class RegisterSource extends StreamData implements StorageData
{
- protected static final String TIME_BUCKET = "time_bucket";
+ public static final String SEQUENCE = "sequence";
+ protected static final String REGISTER_TIME = "register_time";
+ protected static final String HEARTBEAT_TIME = "heartbeat_time";
- @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+ @Getter @Setter @Column(columnName = SEQUENCE) private int sequence;
+ @Getter @Setter @Column(columnName = REGISTER_TIME) private long
registerTime;
+ @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long
heartbeatTime;
- public abstract String id();
-
- public abstract void combine(Indicator indicator);
-
- public abstract String name();
-
- public abstract Map<String, Object> toMap();
-
- public abstract Indicator newOne(Map<String, Object> dbMap);
+ public final void combine(RegisterSource registerSource) {
+ if (heartbeatTime < registerSource.getHeartbeatTime()) {
+ heartbeatTime = registerSource.getHeartbeatTime();
+ }
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
new file mode 100644
index 0000000..8536955
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.endpoint;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@StreamData
+@StorageEntity(name = "endpoint", builder = Endpoint.Builder.class)
+public class Endpoint extends RegisterSource {
+
+ private static final String SERVICE_ID = "service_id";
+ private static final String NAME = "name";
+ private static final String SRC_SPAN_TYPE = "src_span_type";
+
+ @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
+ @Setter @Getter @Column(columnName = NAME, matchQuery = true) private
String name;
+ @Setter @Getter @Column(columnName = SRC_SPAN_TYPE) private int
srcSpanType;
+
+ @Override public String id() {
+ return String.valueOf(serviceId) + Const.ID_SPLIT + name +
Const.ID_SPLIT + String.valueOf(srcSpanType);
+ }
+
+ @Override public int hashCode() {
+ int result = 17;
+ result = 31 * result + serviceId;
+ result = 31 * result + name.hashCode();
+ result = 31 * result + srcSpanType;
+ return result;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ Endpoint source = (Endpoint)obj;
+ if (serviceId != source.getServiceId())
+ return false;
+ if (name.equals(source.getName()))
+ return false;
+ if (srcSpanType != source.getSrcSpanType())
+ return false;
+
+ return true;
+ }
+
+ @Override public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.setDataIntegers(0, getSequence());
+ remoteBuilder.setDataIntegers(1, serviceId);
+ remoteBuilder.setDataIntegers(2, srcSpanType);
+
+ remoteBuilder.setDataLongs(0, getRegisterTime());
+ remoteBuilder.setDataLongs(1, getHeartbeatTime());
+
+ remoteBuilder.setDataStrings(0, name);
+ return remoteBuilder;
+ }
+
+ @Override public void deserialize(RemoteData remoteData) {
+ setSequence(remoteData.getDataIntegers(0));
+ setServiceId(remoteData.getDataIntegers(1));
+ setSrcSpanType(remoteData.getDataIntegers(2));
+
+ setRegisterTime(remoteData.getDataLongs(0));
+ setHeartbeatTime(remoteData.getDataLongs(1));
+
+ setName(remoteData.getDataStrings(1));
+ }
+
+ public static class Builder implements StorageBuilder<Endpoint> {
+
+ @Override public Endpoint map2Data(Map<String, Object> dbMap) {
+ Endpoint endpoint = new Endpoint();
+ endpoint.setSequence((Integer)dbMap.get(SEQUENCE));
+ endpoint.setServiceId((Integer)dbMap.get(SERVICE_ID));
+ endpoint.setName((String)dbMap.get(NAME));
+ endpoint.setSrcSpanType((Integer)dbMap.get(SRC_SPAN_TYPE));
+ endpoint.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
+ endpoint.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
+ return endpoint;
+ }
+
+ @Override public Map<String, Object> data2Map(Endpoint storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(SEQUENCE, storageData.getSequence());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(NAME, storageData.getName());
+ map.put(SRC_SPAN_TYPE, storageData.getSrcSpanType());
+ map.put(REGISTER_TIME, storageData.getRegisterTime());
+ map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
+ return map;
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
new file mode 100644
index 0000000..640e159
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.worker;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RegisterDistinctWorker.class);
+
+ private final AbstractWorker<RegisterSource> nextWorker;
+ private final DataCarrier<RegisterSource> dataCarrier;
+ private final Map<RegisterSource, RegisterSource> sources;
+ private int messageNum;
+
+ public RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource>
nextWorker) {
+ super(workerId);
+ this.nextWorker = nextWorker;
+ this.sources = new HashMap<>();
+ this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+ }
+
+ @Override public final void in(RegisterSource source) {
+ source.setEndOfBatchContext(new EndOfBatchContext(false));
+ dataCarrier.produce(source);
+ }
+
+ private void onWork(RegisterSource source) {
+ messageNum++;
+
+ if (!sources.containsKey(source)) {
+ sources.get(source).combine(source);
+ }
+
+ if (messageNum >= 1000 ||
source.getEndOfBatchContext().isEndOfBatch()) {
+ sources.values().forEach(nextWorker::in);
+ messageNum = 0;
+ }
+ }
+
+ private class AggregatorConsumer implements IConsumer<RegisterSource> {
+
+ private final RegisterDistinctWorker aggregator;
+
+ private AggregatorConsumer(RegisterDistinctWorker aggregator) {
+ this.aggregator = aggregator;
+ }
+
+ @Override public void init() {
+ }
+
+ @Override public void consume(List<RegisterSource> sources) {
+ Iterator<RegisterSource> sourceIterator = sources.iterator();
+
+ int i = 0;
+ while (sourceIterator.hasNext()) {
+ RegisterSource source = sourceIterator.next();
+ i++;
+ if (i == sources.size()) {
+ source.getEndOfBatchContext().setEndOfBatch(true);
+ }
+ aggregator.onWork(source);
+ }
+ }
+
+ @Override public void onError(List<RegisterSource> sources, Throwable
t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ @Override public void onExit() {
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
new file mode 100644
index 0000000..0e35373
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RegisterPersistentWorker.class);
+
+ private final Scope scope;
+ private final String modelName;
+ private final Map<RegisterSource, RegisterSource> sources;
+ private final IRegisterLockDAO registerLockDAO;
+ private final IRegisterDAO registerDAO;
+
+ public RegisterPersistentWorker(int workerId, String modelName,
ModuleManager moduleManager,
+ IRegisterDAO registerDAO, Scope scope) {
+ super(workerId);
+ this.modelName = modelName;
+ this.sources = new HashMap<>();
+ this.registerDAO = registerDAO;
+ this.registerLockDAO =
moduleManager.find(StorageModule.NAME).getService(IRegisterLockDAO.class);
+ this.scope = scope;
+ }
+
+ @Override public final void in(RegisterSource registerSource) {
+ if (!sources.containsKey(registerSource)) {
+ sources.put(registerSource, registerSource);
+ }
+ if (registerSource.getEndOfBatchContext().isEndOfBatch()) {
+
+ if (registerLockDAO.tryLock(scope)) {
+ try {
+ sources.values().forEach(source -> {
+ try {
+ RegisterSource newSource =
registerDAO.get(modelName, registerSource.id());
+ if (Objects.nonNull(newSource)) {
+ newSource.combine(newSource);
+ int sequence = registerDAO.max(modelName);
+ newSource.setSequence(sequence);
+ registerDAO.forceInsert(modelName, newSource);
+ } else {
+ registerDAO.forceUpdate(modelName, newSource);
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage());
+ }
+ });
+ } finally {
+ registerLockDAO.releaseLock(scope);
+ }
+ }
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
similarity index 53%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
index fc9b48d..f8d14dd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
@@ -16,11 +16,10 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.worker;
+package org.apache.skywalking.oap.server.core.register.worker;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -30,35 +29,24 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends
AbstractWorker<INPUT> {
+public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractRemoteWorker.class);
+ private static final Logger logger =
LoggerFactory.getLogger(RegisterRemoteWorker.class);
- private final ModuleManager moduleManager;
- private RemoteSenderService remoteSender;
- private WorkerAnnotationContainer workerMapper;
+ private final AbstractWorker<RegisterSource> nextWorker;
+ private final RemoteSenderService remoteSender;
- public AbstractRemoteWorker(ModuleManager moduleManager) {
- this.moduleManager = moduleManager;
+ RegisterRemoteWorker(int workerId, ModuleManager moduleManager,
AbstractWorker<RegisterSource> nextWorker) {
+ super(workerId);
+ this.remoteSender =
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+ this.nextWorker = nextWorker;
}
- @Override public final void in(INPUT input) {
- if (remoteSender == null) {
- remoteSender =
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
- }
- if (workerMapper == null) {
- workerMapper =
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
- }
-
+ @Override public final void in(RegisterSource indicator) {
try {
- int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
- remoteSender.send(nextWorkerId, input, selector());
+ remoteSender.send(nextWorker.getWorkerId(), indicator,
Selector.ForeverFirst);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
-
- public abstract Class nextWorkerClass();
-
- public abstract Selector selector();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 9b94be8..16bcccf 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerClassGetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
@@ -38,7 +38,6 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
private final ModuleManager moduleManager;
private StreamDataClassGetter streamDataClassGetter;
- private WorkerClassGetter workerClassGetter;
public RemoteServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
@@ -48,9 +47,6 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
if (Objects.isNull(streamDataClassGetter)) {
streamDataClassGetter =
moduleManager.find(CoreModule.NAME).getService(StreamDataClassGetter.class);
}
- if (Objects.isNull(streamDataClassGetter)) {
- workerClassGetter =
moduleManager.find(CoreModule.NAME).getService(WorkerClassGetter.class);
- }
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
@@ -62,7 +58,7 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
try {
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
-
workerClassGetter.getClassById(nextWorkerId).newInstance().in(streamData);
+ WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
} catch (InstantiationException | IllegalAccessException e) {
logger.warn(e.getMessage());
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 361c982..9de03ca 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import java.util.concurrent.*;
-import
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.cluster.*;
+import
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
@@ -96,7 +96,7 @@ public class RemoteClientManager implements Service {
client = currentClientsMap.get(address);
} else {
if (remoteInstance.isSelf()) {
- client = new SelfRemoteClient(moduleManager,
remoteInstance.getHost(), remoteInstance.getPort());
+ client = new SelfRemoteClient(remoteInstance.getHost(),
remoteInstance.getPort());
} else {
client = new GRPCRemoteClient(indicatorMapper,
remoteInstance, 1, 3000);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 9e0ba73..9b4f6bd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,22 +18,18 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
/**
* @author peng-yongsheng
*/
public class SelfRemoteClient implements RemoteClient {
- private final ModuleManager moduleManager;
private final String host;
private final int port;
- public SelfRemoteClient(ModuleManager moduleManager, String host, int
port) {
- this.moduleManager = moduleManager;
+ public SelfRemoteClient(String host, int port) {
this.host = host;
this.port = port;
}
@@ -47,7 +43,6 @@ public class SelfRemoteClient implements RemoteClient {
}
@Override public void push(int nextWorkerId, StreamData streamData) {
- WorkerAnnotationContainer workerMapper =
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
- workerMapper.findInstanceById(nextWorkerId).in(streamData);
+ WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index e744a48..3b30285 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.core.source;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
@@ -28,8 +27,8 @@ public class SourceReceiverImpl implements SourceReceiver {
private final DispatcherManager dispatcherManager;
- public SourceReceiverImpl(ModuleManager moduleManager) {
- this.dispatcherManager = new DispatcherManager(moduleManager);
+ public SourceReceiverImpl() {
+ this.dispatcherManager = new DispatcherManager();
}
@Override public void receive(Source source) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
similarity index 72%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
index 2a9d1f9..3dbf348 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
@@ -24,13 +24,13 @@ import
org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
-public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator>
extends DAO {
+public interface IIndicatorDAO<INSERT, UPDATE> extends DAO {
- INPUT get(INPUT input) throws IOException;
+ Indicator get(String modelName, Indicator indicator) throws IOException;
- INSERT prepareBatchInsert(INPUT input) throws IOException;
+ INSERT prepareBatchInsert(String modelName, Indicator indicator) throws
IOException;
- UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+ UPDATE prepareBatchUpdate(String modelName, Indicator indicator) throws
IOException;
- void deleteHistory(Long timeBucketBefore);
+ void deleteHistory(String modelName, Long timeBucketBefore);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
similarity index 69%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
index 2a9d1f9..18350a9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
@@ -19,18 +19,18 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
/**
* @author peng-yongsheng
*/
-public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator>
extends DAO {
+public interface IRegisterDAO extends DAO {
- INPUT get(INPUT input) throws IOException;
+ int max(String modelName) throws IOException;
- INSERT prepareBatchInsert(INPUT input) throws IOException;
+ RegisterSource get(String modelName, String id) throws IOException;
- UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+ void forceInsert(String modelName, RegisterSource source) throws
IOException;
- void deleteHistory(Long timeBucketBefore);
+ void forceUpdate(String modelName, RegisterSource source) throws
IOException;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
similarity index 78%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
index bc4c495..faf41dd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
@@ -16,14 +16,16 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage;
-import java.lang.annotation.*;
+import java.util.Map;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Worker {
+public interface StorageBuilder<T extends StorageData> {
+
+ T map2Data(Map<String, Object> dbMap);
+
+ Map<String, Object> data2Map(T storageData);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
similarity index 78%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 83b5b8d..b6cf4d6 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -16,14 +16,15 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
-public interface WorkerClassGetter extends Service {
- Class<AbstractWorker> getClassById(int workerId);
+public interface StorageDAO extends Service {
+
+ IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
similarity index 85%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
index fd73d4c..d7de8b8 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
@@ -16,12 +16,11 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker;
+package org.apache.skywalking.oap.server.core.storage;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractWorker<INPUT> {
-
- public abstract void in(INPUT input);
+public interface StorageData {
+ String id();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 62eb72d..2e4beeb 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine {
}
@Override public Class[] services() {
- return new Class[] {IBatchDAO.class, IPersistenceDAO.class,
IRegisterLockDAO.class};
+ return new Class[] {IBatchDAO.class, StorageDAO.class,
IRegisterLockDAO.class};
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
index aa6828b..adbf45e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
@@ -23,8 +23,12 @@ import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.FIELD)
+@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
String columnName();
+
+ boolean matchQuery() default false;
+
+ boolean termQuery() default true;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
similarity index 86%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
index cfc7ce9..5f5de01 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.core.storage.annotation;
-import java.lang.annotation.*;
-
/**
* @author peng-yongsheng
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface StorageEntity {
- String name();
+public enum Query {
+ Term, Match
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
index 894326c..fe134aa 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
@@ -49,8 +49,8 @@ public class StorageAnnotationListener implements
AnnotationListener, IModelGett
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, modelColumns);
- StorageEntity annotation =
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
- models.add(new Model(annotation.name(), modelColumns));
+ String modelName = StorageEntityAnnotationUtils.getModelName(aClass);
+ models.add(new Model(modelName, modelColumns));
}
private void retrieval(Class clazz, List<ModelColumn> modelColumns) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
index cfc7ce9..7bf8cab 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* @author peng-yongsheng
@@ -27,4 +28,6 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
public @interface StorageEntity {
String name();
+
+ Class<? extends StorageBuilder> builder();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
similarity index 51%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
index aa6828b..b754039 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
@@ -18,13 +18,29 @@
package org.apache.skywalking.oap.server.core.storage.annotation;
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* @author peng-yongsheng
*/
-@Target(ElementType.FIELD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Column {
- String columnName();
+public class StorageEntityAnnotationUtils {
+
+ public static String getModelName(Class aClass) {
+ if (aClass.isAnnotationPresent(StorageEntity.class)) {
+ StorageEntity annotation =
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
+ return annotation.name();
+ } else {
+ throw new UnexpectedException("");
+ }
+ }
+
+ public static Class<? extends StorageBuilder> getBuilder(Class aClass) {
+ if (aClass.isAnnotationPresent(StorageEntity.class)) {
+ StorageEntity annotation =
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
+ return annotation.builder();
+ } else {
+ throw new UnexpectedException("");
+ }
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
similarity index 73%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
index 1b24afa..e010e77 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
@@ -16,14 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage.cache;
+
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
-public class WorkerDefineLoadException extends RuntimeException {
+public interface IEndpointCacheDAO extends DAO {
+
+ int get(String id);
- public WorkerDefineLoadException(String message, Throwable cause) {
- super(message, cause);
- }
+ Endpoint get(int sequence);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index fd73d4c..c079a11 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -18,10 +18,18 @@
package org.apache.skywalking.oap.server.core.worker;
+import lombok.Getter;
+
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT> {
+ @Getter private final int workerId;
+
+ public AbstractWorker(int workerId) {
+ this.workerId = workerId;
+ }
+
public abstract void in(INPUT input);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
similarity index 85%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
index fd73d4c..fc7f5af 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
@@ -21,7 +21,12 @@ package org.apache.skywalking.oap.server.core.worker;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractWorker<INPUT> {
+public enum WorkerIdGenerator {
+ INSTANCES;
- public abstract void in(INPUT input);
+ private int workerId = 0;
+
+ public synchronized int generate() {
+ return workerId++;
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
similarity index 72%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
index fd73d4c..a80dbd5 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
@@ -18,10 +18,21 @@
package org.apache.skywalking.oap.server.core.worker;
+import java.util.*;
+
/**
* @author peng-yongsheng
*/
-public abstract class AbstractWorker<INPUT> {
+public enum WorkerInstances {
+ INSTANCES;
+
+ private Map<Integer, AbstractWorker> instances = new HashMap<>();
+
+ public void put(int workerId, AbstractWorker instance) {
+ instances.put(workerId, instance);
+ }
- public abstract void in(INPUT input);
+ public AbstractWorker get(int workerId) {
+ return instances.get(workerId);
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
deleted file mode 100644
index c06cabc..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.worker.annotation;
-
-import java.lang.reflect.Constructor;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
-
-/**
- * @author peng-yongsheng
- */
-public class WorkerAnnotationContainer implements WorkerClassGetter {
-
- private static final Logger logger =
LoggerFactory.getLogger(WorkerAnnotationContainer.class);
-
- private int id = 0;
- private final Map<Class<AbstractWorker>, Integer> classKeyMapping;
- private final Map<Integer, Class<AbstractWorker>> idKeyMapping;
- private final Map<Class<AbstractWorker>, AbstractWorker>
classKeyInstanceMapping;
- private final Map<Integer, AbstractWorker> idKeyInstanceMapping;
-
- public WorkerAnnotationContainer() {
- this.classKeyMapping = new HashMap<>();
- this.idKeyMapping = new HashMap<>();
- this.classKeyInstanceMapping = new HashMap<>();
- this.idKeyInstanceMapping = new HashMap<>();
- }
-
- @SuppressWarnings(value = "unchecked")
- public void load(ModuleManager moduleManager, List<Class> workerClasses)
throws WorkerDefineLoadException {
- if (Objects.isNull(workerClasses)) {
- return;
- }
-
- try {
- for (Class workerClass : workerClasses) {
- id++;
- classKeyMapping.put(workerClass, id);
- idKeyMapping.put(id, workerClass);
-
- Constructor<AbstractWorker> constructor =
workerClass.getDeclaredConstructor(ModuleManager.class);
- AbstractWorker worker = constructor.newInstance(moduleManager);
- classKeyInstanceMapping.put(workerClass, worker);
- idKeyInstanceMapping.put(id, worker);
- }
- } catch (Throwable t) {
- throw new WorkerDefineLoadException(t.getMessage(), t);
- }
- }
-
- @Override public Class<AbstractWorker> getClassById(int workerId) {
- return idKeyMapping.get(id);
- }
-
- public int findIdByClass(Class workerClass) {
- return classKeyMapping.get(workerClass);
- }
-
- public AbstractWorker findInstanceByClass(Class workerClass) {
- return classKeyInstanceMapping.get(workerClass);
- }
-
- public AbstractWorker findInstanceById(int id) {
- return idKeyInstanceMapping.get(id);
- }
-}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index fae8893..757215d 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -18,9 +18,8 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
-import java.util.Map;
import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
@@ -34,22 +33,10 @@ public class TestAvgIndicator extends AvgIndicator {
return null;
}
- @Override public String name() {
- return null;
- }
-
@Override public void deserialize(RemoteData remoteData) {
}
@Override public String id() {
return null;
}
-
- @Override public Map<String, Object> toMap() {
- return null;
- }
-
- @Override public Indicator newOne(Map<String, Object> dbMap) {
- return null;
- }
}
diff --git
a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
deleted file mode 100644
index 97491ff..0000000
--- a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git
a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
deleted file mode 100644
index 33ebbb1..0000000
--- a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index db1db63..e11c4a6 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -63,7 +63,7 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
elasticSearchClient = new
ElasticSearchClient(config.getClusterNodes(), nameSpace);
this.registerServiceImplementation(IBatchDAO.class, new
BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(),
config.getBulkSize(), config.getFlushInterval(),
config.getConcurrentRequests()));
- this.registerServiceImplementation(IPersistenceDAO.class, new
PersistenceEsDAO(elasticSearchClient, nameSpace));
+ this.registerServiceImplementation(StorageDAO.class, new
StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new
RegisterLockDAOImpl(elasticSearchClient, 1000));
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index dd0a70b..e7c8138 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,54 +18,15 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
-import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
- private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
-
public EsDAO(ElasticSearchClient client) {
super(client);
}
-
- protected final int getMaxId(String indexName, String columnName) {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-
searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
- searchSourceBuilder.size(0);
- return getResponse(indexName, searchSourceBuilder);
- }
-
- protected final int getMinId(String indexName, String columnName) {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-
searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
- searchSourceBuilder.size(0);
- return getResponse(indexName, searchSourceBuilder);
- }
-
- private int getResponse(String indexName, SearchSourceBuilder
searchSourceBuilder) {
- try {
- SearchResponse searchResponse = getClient().search(indexName,
searchSourceBuilder);
- Max agg = searchResponse.getAggregations().get("agg");
-
- int id = (int)agg.getValue();
- if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
- return 0;
- } else {
- return id;
- }
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
- return 0;
- }
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
similarity index 60%
rename from
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
rename to
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
index 1f83b63..37b4d47 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
@@ -21,8 +21,7 @@ package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
-import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.core.storage.*;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -32,48 +31,46 @@ import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
*/
-public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest,
UpdateRequest, Indicator> {
+public class IndicatorEsDAO extends EsDAO implements
IIndicatorDAO<IndexRequest, UpdateRequest> {
- private final ElasticSearchClient client;
- private final NameSpace nameSpace;
+ private final StorageBuilder<Indicator> storageBuilder;
- public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
- this.client = client;
- this.nameSpace = nameSpace;
+ public IndicatorEsDAO(ElasticSearchClient client,
StorageBuilder<Indicator> storageBuilder) {
+ super(client);
+ this.storageBuilder = storageBuilder;
}
- @Override public Indicator get(Indicator input) throws IOException {
- GetResponse response = client.get(nameSpace.getNameSpace() + "_" +
input.name(), input.id());
+ @Override public Indicator get(String modelName, Indicator indicator)
throws IOException {
+ GetResponse response = getClient().get(modelName, indicator.id());
if (response.isExists()) {
- return input.newOne(response.getSource());
+ return storageBuilder.map2Data(response.getSource());
} else {
return null;
}
}
- @Override public IndexRequest prepareBatchInsert(Indicator input) throws
IOException {
- Map<String, Object> objectMap = input.toMap();
+ @Override public IndexRequest prepareBatchInsert(String modelName,
Indicator indicator) throws IOException {
+ Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
builder.field(key, objectMap.get(key));
}
builder.endObject();
- return client.prepareInsert(nameSpace.getNameSpace() + "_" +
input.name(), input.id(), builder);
+ return getClient().prepareInsert(modelName, indicator.id(), builder);
}
- @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws
IOException {
- Map<String, Object> objectMap = input.toMap();
+ @Override public UpdateRequest prepareBatchUpdate(String modelName,
Indicator indicator) throws IOException {
+ Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
builder.field(key, objectMap.get(key));
}
builder.endObject();
- return client.prepareUpdate(nameSpace.getNameSpace() + "_" +
input.name(), input.id(), builder);
+ return getClient().prepareUpdate(modelName, indicator.id(), builder);
}
- @Override public void deleteHistory(Long timeBucketBefore) {
-
+ @Override public void deleteHistory(String modelName, Long
timeBucketBefore) {
}
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
new file mode 100644
index 0000000..13c6711
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.storage.*;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RegisterEsDAO.class);
+
+ private final StorageBuilder<RegisterSource> storageBuilder;
+
+ public RegisterEsDAO(ElasticSearchClient client,
StorageBuilder<RegisterSource> storageBuilder) {
+ super(client);
+ this.storageBuilder = storageBuilder;
+ }
+
+ @Override public RegisterSource get(String modelName, String id) throws
IOException {
+ GetResponse response = getClient().get(modelName, id);
+ if (response.isExists()) {
+ return storageBuilder.map2Data(response.getSource());
+ } else {
+ return null;
+ }
+ }
+
+ @Override public void forceInsert(String modelName, RegisterSource source)
throws IOException {
+ Map<String, Object> objectMap = storageBuilder.data2Map(source);
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+ for (String key : objectMap.keySet()) {
+ builder.field(key, objectMap.get(key));
+ }
+ builder.endObject();
+
+ getClient().forceInsert(modelName, source.id(), builder);
+ }
+
+ @Override public void forceUpdate(String modelName, RegisterSource source)
throws IOException {
+ Map<String, Object> objectMap = storageBuilder.data2Map(source);
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+ for (String key : objectMap.keySet()) {
+ builder.field(key, objectMap.get(key));
+ }
+ builder.endObject();
+
+ getClient().forceUpdate(modelName, source.id(), builder);
+ }
+
+ @Override public int max(String modelName) throws IOException {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
searchSourceBuilder.aggregation(AggregationBuilders.max(RegisterSource.SEQUENCE).field(RegisterSource.SEQUENCE));
+ searchSourceBuilder.size(0);
+ return getResponse(modelName, searchSourceBuilder);
+ }
+
+ private int getResponse(String modelName, SearchSourceBuilder
searchSourceBuilder) throws IOException {
+ SearchResponse searchResponse = getClient().search(modelName,
searchSourceBuilder);
+ Max agg =
searchResponse.getAggregations().get(RegisterSource.SEQUENCE);
+
+ int id = (int)agg.getValue();
+ if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
+ return 0;
+ } else {
+ return id;
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
similarity index 58%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
rename to
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 99a8239..641582f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -16,23 +16,22 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
-import
org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
/**
* @author peng-yongsheng
*/
-@Worker
-public class EndpointLatencyAvgPersistentWorker extends
AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
+public class StorageEsDAO extends EsDAO implements StorageDAO {
- public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
- super(moduleManager);
+ public StorageEsDAO(ElasticSearchClient client) {
+ super(client);
}
- @Override protected boolean needMergeDBData() {
- return true;
+ @Override public IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator>
storageBuilder) {
+ return new IndicatorEsDAO(getClient(), storageBuilder);
}
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
new file mode 100644
index 0000000..8645bc8
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
+
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.cache.IEndpointCacheDAO;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.get.GetResponse;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointCacheEsDAO extends EsDAO implements IEndpointCacheDAO {
+
+ public EndpointCacheEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override public int get(String id) {
+ try {
+ GetResponse response = getClient().get("", id);
+ if (response.isExists()) {
+ return response.getField(RegisterSource.SEQUENCE).getValue();
+ } else {
+ return 0;
+ }
+ } catch (Throwable e) {
+ return 0;
+ }
+ }
+
+ @Override public Endpoint get(int sequence) {
+ return null;
+ }
+}