This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new a5ad06c  Refactor register and analysis modules. (#1539)
a5ad06c is described below

commit a5ad06ce46feca3585bae12e5b82be7095fe021f
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Tue Aug 14 14:39:03 2018 +0800

    Refactor register and analysis modules. (#1539)
    
    * Refactor register and analysis modules.
    
    * Fixed the startup error.
---
 ...ntLatencyAvgAggregateWorker.java => Const.java} |  31 +++---
 .../skywalking/oap/server/core/CoreModule.java     |   2 -
 .../oap/server/core/CoreModuleProvider.java        |  14 +--
 .../server/core/analysis/DispatcherManager.java    |   5 +-
 .../core/analysis/endpoint/EndpointDispatcher.java |  18 +--
 .../endpoint/EndpointLatencyAvgIndicator.java      |  60 +++++-----
 .../endpoint/EndpointLatencyAvgRemoteWorker.java   |  43 -------
 .../core/analysis/indicator/AvgIndicator.java      |   3 +-
 .../server/core/analysis/indicator/Indicator.java  |  10 +-
 .../indicator/annotation/IndicatorOperator.java}   |   4 +-
 .../indicator/annotation/IndicatorType.java        |   6 +-
 .../annotation/IndicatorTypeListener.java}         |  23 ++--
 ...orWorker.java => IndicatorAggregateWorker.java} |  71 +++++-------
 ...tWorker.java => IndicatorPersistentWorker.java} |  59 +++++-----
 .../core/analysis/worker/IndicatorProcess.java     |  64 +++++++++++
 ...emoteWorker.java => IndicatorRemoteWorker.java} |  32 ++----
 .../server/core/cache/EndpointCacheService.java    |  95 ++++++++++++++++
 .../RegisterSource.java}                           |  28 ++---
 .../server/core/register/endpoint/Endpoint.java    | 124 +++++++++++++++++++++
 .../register/worker/RegisterDistinctWorker.java    |  99 ++++++++++++++++
 .../register/worker/RegisterPersistentWorker.java  |  81 ++++++++++++++
 .../worker/RegisterRemoteWorker.java}              |  36 ++----
 .../server/core/remote/RemoteServiceHandler.java   |   8 +-
 .../core/remote/client/RemoteClientManager.java    |   4 +-
 .../core/remote/client/SelfRemoteClient.java       |  11 +-
 .../oap/server/core/source/SourceReceiverImpl.java |   5 +-
 .../{IPersistenceDAO.java => IIndicatorDAO.java}   |  10 +-
 .../{IPersistenceDAO.java => IRegisterDAO.java}    |  12 +-
 .../Worker.java => storage/StorageBuilder.java}    |  12 +-
 .../StorageDAO.java}                               |   9 +-
 .../StorageData.java}                              |   7 +-
 .../oap/server/core/storage/StorageModule.java     |   2 +-
 .../oap/server/core/storage/annotation/Column.java |   6 +-
 .../annotation/{StorageEntity.java => Query.java}  |   8 +-
 .../annotation/StorageAnnotationListener.java      |   4 +-
 .../core/storage/annotation/StorageEntity.java     |   3 +
 ...lumn.java => StorageEntityAnnotationUtils.java} |  26 ++++-
 .../cache/IEndpointCacheDAO.java}                  |  13 ++-
 .../oap/server/core/worker/AbstractWorker.java     |   8 ++
 ...{AbstractWorker.java => WorkerIdGenerator.java} |   9 +-
 .../{AbstractWorker.java => WorkerInstances.java}  |  15 ++-
 .../annotation/WorkerAnnotationContainer.java      |  84 --------------
 .../indicator/define/TestAvgIndicator.java         |  15 +--
 .../test/resources/META-INF/defines/indicator.def  |  19 ----
 .../src/test/resources/META-INF/defines/worker.def |  17 ---
 .../StorageModuleElasticsearchProvider.java        |   2 +-
 .../storage/plugin/elasticsearch/base/EsDAO.java   |  39 -------
 .../{PersistenceEsDAO.java => IndicatorEsDAO.java} |  35 +++---
 .../plugin/elasticsearch/base/RegisterEsDAO.java   |  99 ++++++++++++++++
 .../plugin/elasticsearch/base/StorageEsDAO.java}   |  19 ++--
 .../elasticsearch/cache/EndpointCacheEsDAO.java    |  53 +++++++++
 51 files changed, 908 insertions(+), 554 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
similarity index 51%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index d5388c3..55779f9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -16,23 +16,24 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
-
-import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+package org.apache.skywalking.oap.server.core;
 
 /**
  * @author peng-yongsheng
  */
-@Worker
-public class EndpointLatencyAvgAggregateWorker extends 
AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
-
-    public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
-        super(moduleManager);
-    }
-
-    @Override public Class nextWorkerClass() {
-        return EndpointLatencyAvgRemoteWorker.class;
-    }
+public class Const {
+    public static final int NONE = 0;
+    public static final String ID_SPLIT = "_";
+    public static final int NONE_APPLICATION_ID = 1;
+    public static final int NONE_INSTANCE_ID = 1;
+    public static final int NONE_SERVICE_ID = 1;
+    public static final String NONE_SERVICE_NAME = "None";
+    public static final String USER_CODE = "User";
+    public static final String SEGMENT_SPAN_SPLIT = "S";
+    public static final String UNKNOWN = "Unknown";
+    public static final String EXCEPTION = "Exception";
+    public static final String EMPTY_STRING = "";
+    public static final String FILE_SUFFIX = "sw";
+    public static final int SPAN_TYPE_VIRTUAL = 9;
+    public static final String DOMAIN_OPERATION_NAME = "{domain}";
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index 2b0860a..27d1da6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -25,7 +25,6 @@ import 
org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
@@ -56,7 +55,6 @@ public class CoreModule extends ModuleDefine {
     private void addInsideService(List<Class> classes) {
         classes.add(IModelGetter.class);
         classes.add(StreamDataClassGetter.class);
-        classes.add(WorkerAnnotationContainer.class);
         classes.add(RemoteClientManager.class);
         classes.add(RemoteSenderService.class);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9b428f8..8e3d137 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core;
 
 import java.io.IOException;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cluster.*;
 import org.apache.skywalking.oap.server.core.remote.*;
@@ -28,7 +29,6 @@ import org.apache.skywalking.oap.server.core.server.*;
 import org.apache.skywalking.oap.server.core.source.*;
 import 
org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
-import org.apache.skywalking.oap.server.core.worker.annotation.*;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
@@ -48,9 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
     private final AnnotationScan annotationScan;
     private final StorageAnnotationListener storageAnnotationListener;
     private final StreamAnnotationListener streamAnnotationListener;
-    private final WorkerAnnotationListener workerAnnotationListener;
     private final StreamDataAnnotationContainer streamDataAnnotationContainer;
-    private final WorkerAnnotationContainer workerAnnotationContainer;
 
     public CoreModuleProvider() {
         super();
@@ -58,9 +56,7 @@ public class CoreModuleProvider extends ModuleProvider {
         this.annotationScan = new AnnotationScan();
         this.storageAnnotationListener = new StorageAnnotationListener();
         this.streamAnnotationListener = new StreamAnnotationListener();
-        this.workerAnnotationListener = new WorkerAnnotationListener();
         this.streamDataAnnotationContainer = new 
StreamDataAnnotationContainer();
-        this.workerAnnotationContainer = new WorkerAnnotationContainer();
     }
 
     @Override public String name() {
@@ -85,10 +81,9 @@ public class CoreModuleProvider extends ModuleProvider {
         this.registerServiceImplementation(GRPCHandlerRegister.class, new 
GRPCHandlerRegisterImpl(grpcServer));
         this.registerServiceImplementation(JettyHandlerRegister.class, new 
JettyHandlerRegisterImpl(jettyServer));
 
-        this.registerServiceImplementation(SourceReceiver.class, new 
SourceReceiverImpl(getManager()));
+        this.registerServiceImplementation(SourceReceiver.class, new 
SourceReceiverImpl());
 
         this.registerServiceImplementation(StreamDataClassGetter.class, 
streamDataAnnotationContainer);
-        this.registerServiceImplementation(WorkerAnnotationContainer.class, 
workerAnnotationContainer);
 
         this.registerServiceImplementation(RemoteClientManager.class, new 
RemoteClientManager(getManager()));
         this.registerServiceImplementation(RemoteSenderService.class, new 
RemoteSenderService(getManager()));
@@ -96,7 +91,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
         annotationScan.registerListener(storageAnnotationListener);
         annotationScan.registerListener(streamAnnotationListener);
-        annotationScan.registerListener(workerAnnotationListener);
+        annotationScan.registerListener(new 
IndicatorTypeListener(getManager()));
     }
 
     @Override public void start() throws ModuleStartException {
@@ -105,9 +100,8 @@ public class CoreModuleProvider extends ModuleProvider {
         try {
             annotationScan.scan(() -> {
                 
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
-                workerAnnotationContainer.load(getManager(), 
workerAnnotationListener.getWorkerClasses());
             });
-        } catch (WorkerDefineLoadException | IOException e) {
+        } catch (IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index 3dea269..4c0ab0f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.core.analysis;
 import java.util.*;
 import 
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
 import org.apache.skywalking.oap.server.core.source.Scope;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
@@ -33,9 +32,9 @@ public class DispatcherManager {
 
     private Map<Scope, SourceDispatcher> dispatcherMap;
 
-    public DispatcherManager(ModuleManager moduleManager) {
+    public DispatcherManager() {
         this.dispatcherMap = new HashMap<>();
-        this.dispatcherMap.put(Scope.Endpoint, new 
EndpointDispatcher(moduleManager));
+        this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
     }
 
     public SourceDispatcher getDispatcher(Scope scope) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index b7aef09..9bf8838 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -18,38 +18,24 @@
 
 package org.apache.skywalking.oap.server.core.analysis.endpoint;
 
-import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
 import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
 public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
 
-    private final ModuleManager moduleManager;
-    private EndpointLatencyAvgAggregateWorker avgAggregator;
-
-    public EndpointDispatcher(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
-    }
-
     @Override public void dispatch(Endpoint source) {
         avg(source);
     }
 
     private void avg(Endpoint source) {
-        if (avgAggregator == null) {
-            WorkerAnnotationContainer workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
-            avgAggregator = 
(EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
-        }
-
         EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator();
         indicator.setId(source.getId());
         indicator.setTimeBucket(source.getTimeBucket());
         indicator.combine(source.getLatency(), 1);
-        avgAggregator.in(indicator);
+        IndicatorProcess.INSTANCE.in(indicator);
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index 6872acb..718d357 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -20,20 +20,21 @@ package 
org.apache.skywalking.oap.server.core.analysis.endpoint;
 
 import java.util.*;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import 
org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.annotation.*;
 
 /**
  * @author peng-yongsheng
  */
+@IndicatorType
 @StreamData
-@StorageEntity(name = EndpointLatencyAvgIndicator.NAME)
+@StorageEntity(name = "endpoint_latency_avg", builder = 
EndpointLatencyAvgIndicator.Builder.class)
 public class EndpointLatencyAvgIndicator extends AvgIndicator {
 
-    public static final String NAME = "endpoint_latency_avg";
-
     private static final String ID = "id";
     private static final String SERVICE_ID = "service_id";
     private static final String SERVICE_INSTANCE_ID = "service_instance_id";
@@ -42,10 +43,6 @@ public class EndpointLatencyAvgIndicator extends 
AvgIndicator {
     @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
     @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int 
serviceInstanceId;
 
-    @Override public String name() {
-        return NAME;
-    }
-
     @Override public String id() {
         return String.valueOf(id);
     }
@@ -99,27 +96,30 @@ public class EndpointLatencyAvgIndicator extends 
AvgIndicator {
         setValue(remoteData.getDataLongs(2));
     }
 
-    @Override public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<>();
-        map.put(ID, id);
-        map.put(SERVICE_ID, serviceId);
-        map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
-        map.put(COUNT, getCount());
-        map.put(SUMMATION, getSummation());
-        map.put(VALUE, getValue());
-        map.put(TIME_BUCKET, getTimeBucket());
-        return map;
-    }
-
-    @Override public Indicator newOne(Map<String, Object> dbMap) {
-        EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator();
-        indicator.setId((Integer)dbMap.get(ID));
-        indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
-        
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
-        indicator.setCount((Integer)dbMap.get(COUNT));
-        indicator.setSummation((Long)dbMap.get(SUMMATION));
-        indicator.setValue((Long)dbMap.get(VALUE));
-        indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
-        return indicator;
+    public static class Builder implements 
StorageBuilder<EndpointLatencyAvgIndicator> {
+
+        @Override public EndpointLatencyAvgIndicator map2Data(Map<String, 
Object> dbMap) {
+            EndpointLatencyAvgIndicator indicator = new 
EndpointLatencyAvgIndicator();
+            indicator.setId((Integer)dbMap.get(ID));
+            indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
+            
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
+            indicator.setCount((Integer)dbMap.get(COUNT));
+            indicator.setSummation((Long)dbMap.get(SUMMATION));
+            indicator.setValue((Long)dbMap.get(VALUE));
+            indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
+            return indicator;
+        }
+
+        @Override public Map<String, Object> 
data2Map(EndpointLatencyAvgIndicator storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(ID, storageData.getId());
+            map.put(SERVICE_ID, storageData.getServiceId());
+            map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
+            map.put(COUNT, storageData.getCount());
+            map.put(SUMMATION, storageData.getSummation());
+            map.put(VALUE, storageData.getValue());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            return map;
+        }
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
deleted file mode 100644
index f36483a..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgRemoteWorker.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
-
-import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
-/**
- * @author peng-yongsheng
- */
-@Worker
-public class EndpointLatencyAvgRemoteWorker extends 
AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
-
-    public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
-        super(moduleManager);
-    }
-
-    @Override public Selector selector() {
-        return Selector.HashCode;
-    }
-
-    @Override public Class nextWorkerClass() {
-        return EndpointLatencyAvgPersistentWorker.class;
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index 1bae8ec..41f24c8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -20,13 +20,12 @@ package 
org.apache.skywalking.oap.server.core.analysis.indicator;
 
 import lombok.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-@IndicatorType(selector = Selector.HashCode, needMerge = true)
+@IndicatorOperator
 public abstract class AvgIndicator extends Indicator {
 
     protected static final String SUMMATION = "summation";
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 439dee5..fe29ed6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,15 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator;
 
-import java.util.Map;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class Indicator extends StreamData {
+public abstract class Indicator extends StreamData implements StorageData {
 
     protected static final String TIME_BUCKET = "time_bucket";
 
@@ -35,10 +35,4 @@ public abstract class Indicator extends StreamData {
     public abstract String id();
 
     public abstract void combine(Indicator indicator);
-
-    public abstract String name();
-
-    public abstract Map<String, Object> toMap();
-
-    public abstract Indicator newOne(Map<String, Object> dbMap);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
similarity index 89%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
index bc4c495..60ee119 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorOperator.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
 
 import java.lang.annotation.*;
 
@@ -25,5 +25,5 @@ import java.lang.annotation.*;
  */
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
-public @interface Worker {
+public @interface IndicatorOperator {
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index d1ad273..5543257 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -19,15 +19,11 @@
 package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
 
 import java.lang.annotation.*;
-import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 
 /**
  * @author peng-yongsheng
  */
 @Target(ElementType.TYPE)
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.RUNTIME)
 public @interface IndicatorType {
-    Selector selector();
-
-    boolean needMerge();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
similarity index 64%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
index 51f5875..bffcbbf 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationListener.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorTypeListener.java
@@ -16,34 +16,29 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
 
 import java.lang.annotation.Annotation;
-import java.util.*;
-import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class WorkerAnnotationListener implements AnnotationListener {
+public class IndicatorTypeListener implements AnnotationListener {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(WorkerAnnotationListener.class);
+    private final ModuleManager moduleManager;
 
-    @Getter private final List<Class> workerClasses;
-
-    public WorkerAnnotationListener() {
-        this.workerClasses = new LinkedList<>();
+    public IndicatorTypeListener(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
     }
 
     @Override public Class<? extends Annotation> annotation() {
-        return Worker.class;
+        return IndicatorType.class;
     }
 
     @Override public void notify(Class aClass) {
-        logger.info("The owner class of worker annotation, class name: {}", 
aClass.getName());
-
-        workerClasses.add(aClass);
+        IndicatorProcess.INSTANCE.create(moduleManager, aClass);
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
similarity index 54%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 3e420c9..8d0bf88 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,44 +21,41 @@ package 
org.apache.skywalking.oap.server.core.analysis.worker;
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractAggregatorWorker<INPUT extends Indicator> 
extends AbstractWorker<INPUT> {
+public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractAggregatorWorker.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(IndicatorAggregateWorker.class);
 
-    private AbstractWorker worker;
-    private final ModuleManager moduleManager;
-    private final DataCarrier<INPUT> dataCarrier;
-    private final MergeDataCache<INPUT> mergeDataCache;
+    private AbstractWorker<Indicator> nextWorker;
+    private final DataCarrier<Indicator> dataCarrier;
+    private final MergeDataCache<Indicator> mergeDataCache;
     private int messageNum;
 
-    public AbstractAggregatorWorker(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> 
nextWorker) {
+        super(workerId);
+        this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>(1, 10000);
         this.dataCarrier.consume(new AggregatorConsumer(this), 1);
     }
 
-    @Override public final void in(INPUT input) {
-        input.setEndOfBatchContext(new EndOfBatchContext(false));
-        dataCarrier.produce(input);
+    @Override public final void in(Indicator indicator) {
+        indicator.setEndOfBatchContext(new EndOfBatchContext(false));
+        dataCarrier.produce(indicator);
     }
 
-    private void onWork(INPUT message) {
+    private void onWork(Indicator indicator) {
         messageNum++;
-        aggregate(message);
+        aggregate(indicator);
 
-        if (messageNum >= 1000 || 
message.getEndOfBatchContext().isEndOfBatch()) {
+        if (messageNum >= 1000 || 
indicator.getEndOfBatchContext().isEndOfBatch()) {
             sendToNext();
             messageNum = 0;
         }
@@ -74,41 +71,31 @@ public abstract class AbstractAggregatorWorker<INPUT 
extends Indicator> extends
             }
         }
 
-        mergeDataCache.getLast().collection().forEach((INPUT key, INPUT data) 
-> {
+        mergeDataCache.getLast().collection().forEach((Indicator key, 
Indicator data) -> {
             if (logger.isDebugEnabled()) {
                 logger.debug(data.toString());
             }
 
-            onNext(data);
+            nextWorker.in(data);
         });
         mergeDataCache.finishReadingLast();
     }
 
-    private void onNext(INPUT data) {
-        if (worker == null) {
-            WorkerAnnotationContainer workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
-            worker = workerMapper.findInstanceByClass(nextWorkerClass());
-        }
-        worker.in(data);
-    }
-
-    public abstract Class nextWorkerClass();
-
-    private void aggregate(INPUT message) {
+    private void aggregate(Indicator indicator) {
         mergeDataCache.writing();
-        if (mergeDataCache.containsKey(message)) {
-            mergeDataCache.get(message).combine(message);
+        if (mergeDataCache.containsKey(indicator)) {
+            mergeDataCache.get(indicator).combine(indicator);
         } else {
-            mergeDataCache.put(message);
+            mergeDataCache.put(indicator);
         }
         mergeDataCache.finishWriting();
     }
 
-    private class AggregatorConsumer implements IConsumer<INPUT> {
+    private class AggregatorConsumer implements IConsumer<Indicator> {
 
-        private final AbstractAggregatorWorker<INPUT> aggregator;
+        private final IndicatorAggregateWorker aggregator;
 
-        private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) 
{
+        private AggregatorConsumer(IndicatorAggregateWorker aggregator) {
             this.aggregator = aggregator;
         }
 
@@ -116,21 +103,21 @@ public abstract class AbstractAggregatorWorker<INPUT 
extends Indicator> extends
 
         }
 
-        @Override public void consume(List<INPUT> data) {
-            Iterator<INPUT> inputIterator = data.iterator();
+        @Override public void consume(List<Indicator> data) {
+            Iterator<Indicator> inputIterator = data.iterator();
 
             int i = 0;
             while (inputIterator.hasNext()) {
-                INPUT input = inputIterator.next();
+                Indicator indicator = inputIterator.next();
                 i++;
                 if (i == data.size()) {
-                    input.getEndOfBatchContext().setEndOfBatch(true);
+                    indicator.getEndOfBatchContext().setEndOfBatch(true);
                 }
-                aggregator.onWork(input);
+                aggregator.onWork(indicator);
             }
         }
 
-        @Override public void onError(List<INPUT> data, Throwable t) {
+        @Override public void onError(List<Indicator> data, Throwable t) {
             logger.error(t.getMessage(), t);
         }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
similarity index 68%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index a8bee9f..dd140c8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -31,26 +31,31 @@ import static java.util.Objects.nonNull;
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractPersistentWorker<INPUT extends Indicator> 
extends AbstractWorker<INPUT> {
+public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentWorker.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(IndicatorPersistentWorker.class);
 
-    private final MergeDataCache<INPUT> mergeDataCache;
+    private final String modelName;
+    private final MergeDataCache<Indicator> mergeDataCache;
     private final IBatchDAO batchDAO;
-    private final IPersistenceDAO<?, ?, INPUT> persistenceDAO;
-    private final int blockBatchPersistenceSize = 1000;
-
-    public AbstractPersistentWorker(ModuleManager moduleManager) {
+    private final IIndicatorDAO indicatorDAO;
+    private final int blockBatchPersistenceSize;
+
+    IndicatorPersistentWorker(int workerId, String modelName, int batchSize, 
ModuleManager moduleManager,
+        IIndicatorDAO indicatorDAO) {
+        super(workerId);
+        this.modelName = modelName;
+        this.blockBatchPersistenceSize = batchSize;
         this.mergeDataCache = new MergeDataCache<>();
         this.batchDAO = 
moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
-        this.persistenceDAO = 
moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
+        this.indicatorDAO = indicatorDAO;
     }
 
-    public final Window<MergeDataCollection<INPUT>> getCache() {
+    public final Window<MergeDataCollection<Indicator>> getCache() {
         return mergeDataCache;
     }
 
-    @Override public final void in(INPUT input) {
+    @Override public final void in(Indicator input) {
         if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
             try {
                 if (getCache().trySwitchPointer()) {
@@ -86,33 +91,25 @@ public abstract class AbstractPersistentWorker<INPUT 
extends Indicator> extends
         return batchCollection;
     }
 
-    private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) {
+    private List<Object> prepareBatch(MergeDataCollection<Indicator> 
collection) {
         List<Object> batchCollection = new LinkedList<>();
         collection.collection().forEach((id, data) -> {
-            if (needMergeDBData()) {
-                INPUT dbData = null;
+            Indicator dbData = null;
+            try {
+                dbData = indicatorDAO.get(modelName, data);
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+            if (nonNull(dbData)) {
+                dbData.combine(data);
                 try {
-                    dbData = persistenceDAO.get(data);
+                    
batchCollection.add(indicatorDAO.prepareBatchUpdate(modelName, dbData));
                 } catch (Throwable t) {
                     logger.error(t.getMessage(), t);
                 }
-                if (nonNull(dbData)) {
-                    dbData.combine(data);
-                    try {
-                        
batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData));
-                    } catch (Throwable t) {
-                        logger.error(t.getMessage(), t);
-                    }
-                } else {
-                    try {
-                        
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
-                    } catch (Throwable t) {
-                        logger.error(t.getMessage(), t);
-                    }
-                }
             } else {
                 try {
-                    
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
+                    
batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
                 } catch (Throwable t) {
                     logger.error(t.getMessage(), t);
                 }
@@ -122,7 +119,7 @@ public abstract class AbstractPersistentWorker<INPUT 
extends Indicator> extends
         return batchCollection;
     }
 
-    private void cacheData(INPUT input) {
+    private void cacheData(Indicator input) {
         mergeDataCache.writing();
         if (mergeDataCache.containsKey(input)) {
             mergeDataCache.get(input).combine(input);
@@ -132,6 +129,4 @@ public abstract class AbstractPersistentWorker<INPUT 
extends Indicator> extends
 
         mergeDataCache.finishWriting();
     }
-
-    protected abstract boolean needMergeDBData();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
new file mode 100644
index 0000000..398d61d
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
+import 
org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum IndicatorProcess {
+    INSTANCE;
+
+    private Map<Class<? extends Indicator>, IndicatorAggregateWorker> 
entryWorkers = new HashMap<>();
+
+    public void in(Indicator indicator) {
+        entryWorkers.get(indicator.getClass()).in(indicator);
+    }
+
+    public void create(ModuleManager moduleManager, Class<? extends Indicator> 
indicatorClass) {
+        String modelName = 
StorageEntityAnnotationUtils.getModelName(indicatorClass);
+        Class<? extends StorageBuilder> builderClass = 
StorageEntityAnnotationUtils.getBuilder(indicatorClass);
+
+        StorageDAO storageDAO = 
moduleManager.find(StorageModule.NAME).getService(StorageDAO.class);
+        IIndicatorDAO indicatorDAO;
+        try {
+            indicatorDAO = 
storageDAO.newIndicatorDao(builderClass.newInstance());
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new UnexpectedException("");
+        }
+
+        IndicatorPersistentWorker persistentWorker = new 
IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, 
1000, moduleManager, indicatorDAO);
+        WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), 
persistentWorker);
+
+        IndicatorRemoteWorker remoteWorker = new 
IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, 
persistentWorker);
+        WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), 
remoteWorker);
+
+        IndicatorAggregateWorker aggregateWorker = new 
IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker);
+        WorkerInstances.INSTANCES.put(aggregateWorker.getWorkerId(), 
aggregateWorker);
+
+        entryWorkers.put(indicatorClass, aggregateWorker);
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
similarity index 59%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
index fc9b48d..0295864 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -30,35 +29,24 @@ import org.slf4j.*;
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends 
AbstractWorker<INPUT> {
+public class IndicatorRemoteWorker extends AbstractWorker<Indicator> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractRemoteWorker.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(IndicatorRemoteWorker.class);
 
-    private final ModuleManager moduleManager;
-    private RemoteSenderService remoteSender;
-    private WorkerAnnotationContainer workerMapper;
+    private final AbstractWorker<Indicator> nextWorker;
+    private final RemoteSenderService remoteSender;
 
-    public AbstractRemoteWorker(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, 
AbstractWorker<Indicator> nextWorker) {
+        super(workerId);
+        this.remoteSender = 
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+        this.nextWorker = nextWorker;
     }
 
-    @Override public final void in(INPUT input) {
-        if (remoteSender == null) {
-            remoteSender = 
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
-        }
-        if (workerMapper == null) {
-            workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
-        }
-
+    @Override public final void in(Indicator indicator) {
         try {
-            int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
-            remoteSender.send(nextWorkerId, input, selector());
+            remoteSender.send(nextWorker.getWorkerId(), indicator, 
Selector.HashCode);
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
     }
-
-    public abstract Class nextWorkerClass();
-
-    public abstract Selector selector();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
new file mode 100644
index 0000000..71e5814
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/EndpointCacheService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.cache;
+
+import com.google.common.cache.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.cache.IEndpointCacheDAO;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.slf4j.*;
+
+import static java.util.Objects.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointCacheService implements Service {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(EndpointCacheService.class);
+
+    private final ModuleManager moduleManager;
+    private IEndpointCacheDAO cacheDAO;
+
+    public EndpointCacheService(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
+    }
+
+    private final Cache<String, Integer> idCache = 
CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(1000000).build();
+
+    private final Cache<Integer, Endpoint> sequenceCache = 
CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(1000000).build();
+
+    public int get(int serviceId, String serviceName, int srcSpanType) {
+        String id = serviceId + Const.ID_SPLIT + serviceName + Const.ID_SPLIT 
+ srcSpanType;
+
+        int endpointId = 0;
+
+        try {
+            endpointId = idCache.get(id, () -> getCacheDAO().get(id));
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        if (serviceId == 0) {
+            endpointId = getCacheDAO().get(id);
+            if (endpointId != 0) {
+                idCache.put(id, endpointId);
+            }
+        }
+        return endpointId;
+    }
+
+    public Endpoint get(int endpointId) {
+        Endpoint endpoint = null;
+        try {
+            endpoint = sequenceCache.get(endpointId, () -> 
getCacheDAO().get(endpointId));
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        if (isNull(endpoint)) {
+            endpoint = getCacheDAO().get(endpointId);
+            if (nonNull(endpoint)) {
+                sequenceCache.put(endpointId, endpoint);
+            } else {
+                logger.warn("Endpoint id {} is not in cache and persistent 
storage.", endpointId);
+            }
+        }
+
+        return endpoint;
+    }
+
+    private IEndpointCacheDAO getCacheDAO() {
+        if (isNull(cacheDAO)) {
+            cacheDAO = 
moduleManager.find(StorageModule.NAME).getService(IEndpointCacheDAO.class);
+        }
+        return cacheDAO;
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
similarity index 54%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
index 439dee5..abb6c9e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
@@ -16,29 +16,29 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.indicator;
+package org.apache.skywalking.oap.server.core.register;
 
-import java.util.Map;
 import lombok.*;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class Indicator extends StreamData {
+public abstract class RegisterSource extends StreamData implements StorageData 
{
 
-    protected static final String TIME_BUCKET = "time_bucket";
+    public static final String SEQUENCE = "sequence";
+    protected static final String REGISTER_TIME = "register_time";
+    protected static final String HEARTBEAT_TIME = "heartbeat_time";
 
-    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+    @Getter @Setter @Column(columnName = SEQUENCE) private int sequence;
+    @Getter @Setter @Column(columnName = REGISTER_TIME) private long 
registerTime;
+    @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long 
heartbeatTime;
 
-    public abstract String id();
-
-    public abstract void combine(Indicator indicator);
-
-    public abstract String name();
-
-    public abstract Map<String, Object> toMap();
-
-    public abstract Indicator newOne(Map<String, Object> dbMap);
+    public final void combine(RegisterSource registerSource) {
+        if (heartbeatTime < registerSource.getHeartbeatTime()) {
+            heartbeatTime = registerSource.getHeartbeatTime();
+        }
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
new file mode 100644
index 0000000..8536955
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/endpoint/Endpoint.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.endpoint;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@StreamData
+@StorageEntity(name = "endpoint", builder = Endpoint.Builder.class)
+public class Endpoint extends RegisterSource {
+
+    private static final String SERVICE_ID = "service_id";
+    private static final String NAME = "name";
+    private static final String SRC_SPAN_TYPE = "src_span_type";
+
+    @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
+    @Setter @Getter @Column(columnName = NAME, matchQuery = true) private 
String name;
+    @Setter @Getter @Column(columnName = SRC_SPAN_TYPE) private int 
srcSpanType;
+
+    @Override public String id() {
+        return String.valueOf(serviceId) + Const.ID_SPLIT + name + 
Const.ID_SPLIT + String.valueOf(srcSpanType);
+    }
+
+    @Override public int hashCode() {
+        int result = 17;
+        result = 31 * result + serviceId;
+        result = 31 * result + name.hashCode();
+        result = 31 * result + srcSpanType;
+        return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        Endpoint source = (Endpoint)obj;
+        if (serviceId != source.getServiceId())
+            return false;
+        if (name.equals(source.getName()))
+            return false;
+        if (srcSpanType != source.getSrcSpanType())
+            return false;
+
+        return true;
+    }
+
+    @Override public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.setDataIntegers(0, getSequence());
+        remoteBuilder.setDataIntegers(1, serviceId);
+        remoteBuilder.setDataIntegers(2, srcSpanType);
+
+        remoteBuilder.setDataLongs(0, getRegisterTime());
+        remoteBuilder.setDataLongs(1, getHeartbeatTime());
+
+        remoteBuilder.setDataStrings(0, name);
+        return remoteBuilder;
+    }
+
+    @Override public void deserialize(RemoteData remoteData) {
+        setSequence(remoteData.getDataIntegers(0));
+        setServiceId(remoteData.getDataIntegers(1));
+        setSrcSpanType(remoteData.getDataIntegers(2));
+
+        setRegisterTime(remoteData.getDataLongs(0));
+        setHeartbeatTime(remoteData.getDataLongs(1));
+
+        setName(remoteData.getDataStrings(1));
+    }
+
+    public static class Builder implements StorageBuilder<Endpoint> {
+
+        @Override public Endpoint map2Data(Map<String, Object> dbMap) {
+            Endpoint endpoint = new Endpoint();
+            endpoint.setSequence((Integer)dbMap.get(SEQUENCE));
+            endpoint.setServiceId((Integer)dbMap.get(SERVICE_ID));
+            endpoint.setName((String)dbMap.get(NAME));
+            endpoint.setSrcSpanType((Integer)dbMap.get(SRC_SPAN_TYPE));
+            endpoint.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
+            endpoint.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
+            return endpoint;
+        }
+
+        @Override public Map<String, Object> data2Map(Endpoint storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(SEQUENCE, storageData.getSequence());
+            map.put(SERVICE_ID, storageData.getServiceId());
+            map.put(NAME, storageData.getName());
+            map.put(SRC_SPAN_TYPE, storageData.getSrcSpanType());
+            map.put(REGISTER_TIME, storageData.getRegisterTime());
+            map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
+            return map;
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
new file mode 100644
index 0000000..640e159
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.worker;
+
+import java.util.*;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RegisterDistinctWorker.class);
+
+    private final AbstractWorker<RegisterSource> nextWorker;
+    private final DataCarrier<RegisterSource> dataCarrier;
+    private final Map<RegisterSource, RegisterSource> sources;
+    private int messageNum;
+
+    public RegisterDistinctWorker(int workerId, AbstractWorker<RegisterSource> 
nextWorker) {
+        super(workerId);
+        this.nextWorker = nextWorker;
+        this.sources = new HashMap<>();
+        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+    }
+
+    @Override public final void in(RegisterSource source) {
+        source.setEndOfBatchContext(new EndOfBatchContext(false));
+        dataCarrier.produce(source);
+    }
+
+    private void onWork(RegisterSource source) {
+        messageNum++;
+
+        if (!sources.containsKey(source)) {
+            sources.get(source).combine(source);
+        }
+
+        if (messageNum >= 1000 || 
source.getEndOfBatchContext().isEndOfBatch()) {
+            sources.values().forEach(nextWorker::in);
+            messageNum = 0;
+        }
+    }
+
+    private class AggregatorConsumer implements IConsumer<RegisterSource> {
+
+        private final RegisterDistinctWorker aggregator;
+
+        private AggregatorConsumer(RegisterDistinctWorker aggregator) {
+            this.aggregator = aggregator;
+        }
+
+        @Override public void init() {
+        }
+
+        @Override public void consume(List<RegisterSource> sources) {
+            Iterator<RegisterSource> sourceIterator = sources.iterator();
+
+            int i = 0;
+            while (sourceIterator.hasNext()) {
+                RegisterSource source = sourceIterator.next();
+                i++;
+                if (i == sources.size()) {
+                    source.getEndOfBatchContext().setEndOfBatch(true);
+                }
+                aggregator.onWork(source);
+            }
+        }
+
+        @Override public void onError(List<RegisterSource> sources, Throwable 
t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
new file mode 100644
index 0000000..0e35373
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.register.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.source.Scope;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RegisterPersistentWorker.class);
+
+    private final Scope scope;
+    private final String modelName;
+    private final Map<RegisterSource, RegisterSource> sources;
+    private final IRegisterLockDAO registerLockDAO;
+    private final IRegisterDAO registerDAO;
+
+    public RegisterPersistentWorker(int workerId, String modelName, 
ModuleManager moduleManager,
+        IRegisterDAO registerDAO, Scope scope) {
+        super(workerId);
+        this.modelName = modelName;
+        this.sources = new HashMap<>();
+        this.registerDAO = registerDAO;
+        this.registerLockDAO = 
moduleManager.find(StorageModule.NAME).getService(IRegisterLockDAO.class);
+        this.scope = scope;
+    }
+
+    @Override public final void in(RegisterSource registerSource) {
+        if (!sources.containsKey(registerSource)) {
+            sources.put(registerSource, registerSource);
+        }
+        if (registerSource.getEndOfBatchContext().isEndOfBatch()) {
+
+            if (registerLockDAO.tryLock(scope)) {
+                try {
+                    sources.values().forEach(source -> {
+                        try {
+                            RegisterSource newSource = 
registerDAO.get(modelName, registerSource.id());
+                            if (Objects.nonNull(newSource)) {
+                                newSource.combine(newSource);
+                                int sequence = registerDAO.max(modelName);
+                                newSource.setSequence(sequence);
+                                registerDAO.forceInsert(modelName, newSource);
+                            } else {
+                                registerDAO.forceUpdate(modelName, newSource);
+                            }
+                        } catch (Throwable t) {
+                            logger.error(t.getMessage());
+                        }
+                    });
+                } finally {
+                    registerLockDAO.releaseLock(scope);
+                }
+            }
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
similarity index 53%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
index fc9b48d..f8d14dd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractRemoteWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
@@ -16,11 +16,10 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.worker;
+package org.apache.skywalking.oap.server.core.register.worker;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.selector.Selector;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -30,35 +29,24 @@ import org.slf4j.*;
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends 
AbstractWorker<INPUT> {
+public class RegisterRemoteWorker extends AbstractWorker<RegisterSource> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractRemoteWorker.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(RegisterRemoteWorker.class);
 
-    private final ModuleManager moduleManager;
-    private RemoteSenderService remoteSender;
-    private WorkerAnnotationContainer workerMapper;
+    private final AbstractWorker<RegisterSource> nextWorker;
+    private final RemoteSenderService remoteSender;
 
-    public AbstractRemoteWorker(ModuleManager moduleManager) {
-        this.moduleManager = moduleManager;
+    RegisterRemoteWorker(int workerId, ModuleManager moduleManager, 
AbstractWorker<RegisterSource> nextWorker) {
+        super(workerId);
+        this.remoteSender = 
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
+        this.nextWorker = nextWorker;
     }
 
-    @Override public final void in(INPUT input) {
-        if (remoteSender == null) {
-            remoteSender = 
moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
-        }
-        if (workerMapper == null) {
-            workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
-        }
-
+    @Override public final void in(RegisterSource indicator) {
         try {
-            int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
-            remoteSender.send(nextWorkerId, input, selector());
+            remoteSender.send(nextWorker.getWorkerId(), indicator, 
Selector.ForeverFirst);
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
     }
-
-    public abstract Class nextWorkerClass();
-
-    public abstract Selector selector();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 9b94be8..16bcccf 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import 
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerClassGetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.slf4j.*;
@@ -38,7 +38,6 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
 
     private final ModuleManager moduleManager;
     private StreamDataClassGetter streamDataClassGetter;
-    private WorkerClassGetter workerClassGetter;
 
     public RemoteServiceHandler(ModuleManager moduleManager) {
         this.moduleManager = moduleManager;
@@ -48,9 +47,6 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
         if (Objects.isNull(streamDataClassGetter)) {
             streamDataClassGetter = 
moduleManager.find(CoreModule.NAME).getService(StreamDataClassGetter.class);
         }
-        if (Objects.isNull(streamDataClassGetter)) {
-            workerClassGetter = 
moduleManager.find(CoreModule.NAME).getService(WorkerClassGetter.class);
-        }
 
         return new StreamObserver<RemoteMessage>() {
             @Override public void onNext(RemoteMessage message) {
@@ -62,7 +58,7 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
                 try {
                     StreamData streamData = streamDataClass.newInstance();
                     streamData.deserialize(remoteData);
-                    
workerClassGetter.getClassById(nextWorkerId).newInstance().in(streamData);
+                    WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
                 } catch (InstantiationException | IllegalAccessException e) {
                     logger.warn(e.getMessage());
                 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 361c982..9de03ca 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.remote.client;
 
 import java.util.*;
 import java.util.concurrent.*;
-import 
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import 
org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
 import org.apache.skywalking.oap.server.library.module.*;
 import org.slf4j.*;
 
@@ -96,7 +96,7 @@ public class RemoteClientManager implements Service {
                 client = currentClientsMap.get(address);
             } else {
                 if (remoteInstance.isSelf()) {
-                    client = new SelfRemoteClient(moduleManager, 
remoteInstance.getHost(), remoteInstance.getPort());
+                    client = new SelfRemoteClient(remoteInstance.getHost(), 
remoteInstance.getPort());
                 } else {
                     client = new GRPCRemoteClient(indicatorMapper, 
remoteInstance, 1, 3000);
                 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 9e0ba73..9b4f6bd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,22 +18,18 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import 
org.apache.skywalking.oap.server.core.worker.annotation.WorkerAnnotationContainer;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 
 /**
  * @author peng-yongsheng
  */
 public class SelfRemoteClient implements RemoteClient {
 
-    private final ModuleManager moduleManager;
     private final String host;
     private final int port;
 
-    public SelfRemoteClient(ModuleManager moduleManager, String host, int 
port) {
-        this.moduleManager = moduleManager;
+    public SelfRemoteClient(String host, int port) {
         this.host = host;
         this.port = port;
     }
@@ -47,7 +43,6 @@ public class SelfRemoteClient implements RemoteClient {
     }
 
     @Override public void push(int nextWorkerId, StreamData streamData) {
-        WorkerAnnotationContainer workerMapper = 
moduleManager.find(CoreModule.NAME).getService(WorkerAnnotationContainer.class);
-        workerMapper.findInstanceById(nextWorkerId).in(streamData);
+        WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
index e744a48..3b30285 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.core.source;
 
 import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
@@ -28,8 +27,8 @@ public class SourceReceiverImpl implements SourceReceiver {
 
     private final DispatcherManager dispatcherManager;
 
-    public SourceReceiverImpl(ModuleManager moduleManager) {
-        this.dispatcherManager = new DispatcherManager(moduleManager);
+    public SourceReceiverImpl() {
+        this.dispatcherManager = new DispatcherManager();
     }
 
     @Override public void receive(Source source) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
similarity index 72%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
index 2a9d1f9..3dbf348 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IIndicatorDAO.java
@@ -24,13 +24,13 @@ import 
org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 /**
  * @author peng-yongsheng
  */
-public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> 
extends DAO {
+public interface IIndicatorDAO<INSERT, UPDATE> extends DAO {
 
-    INPUT get(INPUT input) throws IOException;
+    Indicator get(String modelName, Indicator indicator) throws IOException;
 
-    INSERT prepareBatchInsert(INPUT input) throws IOException;
+    INSERT prepareBatchInsert(String modelName, Indicator indicator) throws 
IOException;
 
-    UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+    UPDATE prepareBatchUpdate(String modelName, Indicator indicator) throws 
IOException;
 
-    void deleteHistory(Long timeBucketBefore);
+    void deleteHistory(String modelName, Long timeBucketBefore);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
similarity index 69%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
index 2a9d1f9..18350a9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
@@ -19,18 +19,18 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.io.IOException;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
 
 /**
  * @author peng-yongsheng
  */
-public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> 
extends DAO {
+public interface IRegisterDAO extends DAO {
 
-    INPUT get(INPUT input) throws IOException;
+    int max(String modelName) throws IOException;
 
-    INSERT prepareBatchInsert(INPUT input) throws IOException;
+    RegisterSource get(String modelName, String id) throws IOException;
 
-    UPDATE prepareBatchUpdate(INPUT input) throws IOException;
+    void forceInsert(String modelName, RegisterSource source) throws 
IOException;
 
-    void deleteHistory(Long timeBucketBefore);
+    void forceUpdate(String modelName, RegisterSource source) throws 
IOException;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
similarity index 78%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
index bc4c495..faf41dd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/Worker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageBuilder.java
@@ -16,14 +16,16 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage;
 
-import java.lang.annotation.*;
+import java.util.Map;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Worker {
+public interface StorageBuilder<T extends StorageData> {
+
+    T map2Data(Map<String, Object> dbMap);
+
+    Map<String, Object> data2Map(T storageData);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
similarity index 78%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 83b5b8d..b6cf4d6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerClassGetter.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -16,14 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage;
 
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
  * @author peng-yongsheng
  */
-public interface WorkerClassGetter extends Service {
-    Class<AbstractWorker> getClassById(int workerId);
+public interface StorageDAO extends Service {
+
+    IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
similarity index 85%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
index fd73d4c..d7de8b8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java
@@ -16,12 +16,11 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker;
+package org.apache.skywalking.oap.server.core.storage;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractWorker<INPUT> {
-
-    public abstract void in(INPUT input);
+public interface StorageData {
+    String id();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 62eb72d..2e4beeb 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine {
     }
 
     @Override public Class[] services() {
-        return new Class[] {IBatchDAO.class, IPersistenceDAO.class, 
IRegisterLockDAO.class};
+        return new Class[] {IBatchDAO.class, StorageDAO.class, 
IRegisterLockDAO.class};
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
index aa6828b..adbf45e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
@@ -23,8 +23,12 @@ import java.lang.annotation.*;
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.FIELD)
+@Target({ElementType.FIELD})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Column {
     String columnName();
+
+    boolean matchQuery() default false;
+
+    boolean termQuery() default true;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
similarity index 86%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
index cfc7ce9..5f5de01 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Query.java
@@ -18,13 +18,9 @@
 
 package org.apache.skywalking.oap.server.core.storage.annotation;
 
-import java.lang.annotation.*;
-
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface StorageEntity {
-    String name();
+public enum Query {
+    Term, Match
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
index 894326c..fe134aa 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageAnnotationListener.java
@@ -49,8 +49,8 @@ public class StorageAnnotationListener implements 
AnnotationListener, IModelGett
         List<ModelColumn> modelColumns = new LinkedList<>();
         retrieval(aClass, modelColumns);
 
-        StorageEntity annotation = 
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
-        models.add(new Model(annotation.name(), modelColumns));
+        String modelName = StorageEntityAnnotationUtils.getModelName(aClass);
+        models.add(new Model(modelName, modelColumns));
     }
 
     private void retrieval(Class clazz, List<ModelColumn> modelColumns) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
index cfc7ce9..7bf8cab 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntity.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage.annotation;
 
 import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 
 /**
  * @author peng-yongsheng
@@ -27,4 +28,6 @@ import java.lang.annotation.*;
 @Retention(RetentionPolicy.RUNTIME)
 public @interface StorageEntity {
     String name();
+
+    Class<? extends StorageBuilder> builder();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
similarity index 51%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
index aa6828b..b754039 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java
@@ -18,13 +18,29 @@
 
 package org.apache.skywalking.oap.server.core.storage.annotation;
 
-import java.lang.annotation.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 
 /**
  * @author peng-yongsheng
  */
-@Target(ElementType.FIELD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Column {
-    String columnName();
+public class StorageEntityAnnotationUtils {
+
+    public static String getModelName(Class aClass) {
+        if (aClass.isAnnotationPresent(StorageEntity.class)) {
+            StorageEntity annotation = 
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
+            return annotation.name();
+        } else {
+            throw new UnexpectedException("");
+        }
+    }
+
+    public static Class<? extends StorageBuilder> getBuilder(Class aClass) {
+        if (aClass.isAnnotationPresent(StorageEntity.class)) {
+            StorageEntity annotation = 
(StorageEntity)aClass.getAnnotation(StorageEntity.class);
+            return annotation.builder();
+        } else {
+            throw new UnexpectedException("");
+        }
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
similarity index 73%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
index 1b24afa..e010e77 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerDefineLoadException.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IEndpointCacheDAO.java
@@ -16,14 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.worker.annotation;
+package org.apache.skywalking.oap.server.core.storage.cache;
+
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.DAO;
 
 /**
  * @author peng-yongsheng
  */
-public class WorkerDefineLoadException extends RuntimeException {
+public interface IEndpointCacheDAO extends DAO {
+
+    int get(String id);
 
-    public WorkerDefineLoadException(String message, Throwable cause) {
-        super(message, cause);
-    }
+    Endpoint get(int sequence);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index fd73d4c..c079a11 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -18,10 +18,18 @@
 
 package org.apache.skywalking.oap.server.core.worker;
 
+import lombok.Getter;
+
 /**
  * @author peng-yongsheng
  */
 public abstract class AbstractWorker<INPUT> {
 
+    @Getter private final int workerId;
+
+    public AbstractWorker(int workerId) {
+        this.workerId = workerId;
+    }
+
     public abstract void in(INPUT input);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
similarity index 85%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
index fd73d4c..fc7f5af 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerIdGenerator.java
@@ -21,7 +21,12 @@ package org.apache.skywalking.oap.server.core.worker;
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractWorker<INPUT> {
+public enum WorkerIdGenerator {
+    INSTANCES;
 
-    public abstract void in(INPUT input);
+    private int workerId = 0;
+
+    public synchronized int generate() {
+        return workerId++;
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
similarity index 72%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
index fd73d4c..a80dbd5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstances.java
@@ -18,10 +18,21 @@
 
 package org.apache.skywalking.oap.server.core.worker;
 
+import java.util.*;
+
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractWorker<INPUT> {
+public enum WorkerInstances {
+    INSTANCES;
+
+    private Map<Integer, AbstractWorker> instances = new HashMap<>();
+
+    public void put(int workerId, AbstractWorker instance) {
+        instances.put(workerId, instance);
+    }
 
-    public abstract void in(INPUT input);
+    public AbstractWorker get(int workerId) {
+        return instances.get(workerId);
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
deleted file mode 100644
index c06cabc..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/annotation/WorkerAnnotationContainer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.worker.annotation;
-
-import java.lang.reflect.Constructor;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
-
-/**
- * @author peng-yongsheng
- */
-public class WorkerAnnotationContainer implements WorkerClassGetter {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(WorkerAnnotationContainer.class);
-
-    private int id = 0;
-    private final Map<Class<AbstractWorker>, Integer> classKeyMapping;
-    private final Map<Integer, Class<AbstractWorker>> idKeyMapping;
-    private final Map<Class<AbstractWorker>, AbstractWorker> 
classKeyInstanceMapping;
-    private final Map<Integer, AbstractWorker> idKeyInstanceMapping;
-
-    public WorkerAnnotationContainer() {
-        this.classKeyMapping = new HashMap<>();
-        this.idKeyMapping = new HashMap<>();
-        this.classKeyInstanceMapping = new HashMap<>();
-        this.idKeyInstanceMapping = new HashMap<>();
-    }
-
-    @SuppressWarnings(value = "unchecked")
-    public void load(ModuleManager moduleManager, List<Class> workerClasses) 
throws WorkerDefineLoadException {
-        if (Objects.isNull(workerClasses)) {
-            return;
-        }
-
-        try {
-            for (Class workerClass : workerClasses) {
-                id++;
-                classKeyMapping.put(workerClass, id);
-                idKeyMapping.put(id, workerClass);
-
-                Constructor<AbstractWorker> constructor = 
workerClass.getDeclaredConstructor(ModuleManager.class);
-                AbstractWorker worker = constructor.newInstance(moduleManager);
-                classKeyInstanceMapping.put(workerClass, worker);
-                idKeyInstanceMapping.put(id, worker);
-            }
-        } catch (Throwable t) {
-            throw new WorkerDefineLoadException(t.getMessage(), t);
-        }
-    }
-
-    @Override public Class<AbstractWorker> getClassById(int workerId) {
-        return idKeyMapping.get(id);
-    }
-
-    public int findIdByClass(Class workerClass) {
-        return classKeyMapping.get(workerClass);
-    }
-
-    public AbstractWorker findInstanceByClass(Class workerClass) {
-        return classKeyInstanceMapping.get(workerClass);
-    }
-
-    public AbstractWorker findInstanceById(int id) {
-        return idKeyInstanceMapping.get(id);
-    }
-}
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
index fae8893..757215d 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/TestAvgIndicator.java
@@ -18,9 +18,8 @@
 
 package org.apache.skywalking.oap.server.core.analysis.indicator.define;
 
-import java.util.Map;
 import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
@@ -34,22 +33,10 @@ public class TestAvgIndicator extends AvgIndicator {
         return null;
     }
 
-    @Override public String name() {
-        return null;
-    }
-
     @Override public void deserialize(RemoteData remoteData) {
     }
 
     @Override public String id() {
         return null;
     }
-
-    @Override public Map<String, Object> toMap() {
-        return null;
-    }
-
-    @Override public Indicator newOne(Map<String, Object> dbMap) {
-        return null;
-    }
 }
diff --git 
a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def 
b/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
deleted file mode 100644
index 97491ff..0000000
--- a/oap-server/server-core/src/test/resources/META-INF/defines/indicator.def
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def 
b/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
deleted file mode 100644
index 33ebbb1..0000000
--- a/oap-server/server-core/src/test/resources/META-INF/defines/worker.def
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
\ No newline at end of file
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index db1db63..e11c4a6 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -63,7 +63,7 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
         elasticSearchClient = new 
ElasticSearchClient(config.getClusterNodes(), nameSpace);
 
         this.registerServiceImplementation(IBatchDAO.class, new 
BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), 
config.getBulkSize(), config.getFlushInterval(), 
config.getConcurrentRequests()));
-        this.registerServiceImplementation(IPersistenceDAO.class, new 
PersistenceEsDAO(elasticSearchClient, nameSpace));
+        this.registerServiceImplementation(StorageDAO.class, new 
StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IRegisterLockDAO.class, new 
RegisterLockDAOImpl(elasticSearchClient, 1000));
     }
 
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
index dd0a70b..e7c8138 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -18,54 +18,15 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
-import java.io.IOException;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
 
-    private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
-
     public EsDAO(ElasticSearchClient client) {
         super(client);
     }
-
-    protected final int getMaxId(String indexName, String columnName) {
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        
searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
-        searchSourceBuilder.size(0);
-        return getResponse(indexName, searchSourceBuilder);
-    }
-
-    protected final int getMinId(String indexName, String columnName) {
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        
searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
-        searchSourceBuilder.size(0);
-        return getResponse(indexName, searchSourceBuilder);
-    }
-
-    private int getResponse(String indexName, SearchSourceBuilder 
searchSourceBuilder) {
-        try {
-            SearchResponse searchResponse = getClient().search(indexName, 
searchSourceBuilder);
-            Max agg = searchResponse.getAggregations().get("agg");
-
-            int id = (int)agg.getValue();
-            if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
-                return 0;
-            } else {
-                return id;
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-        return 0;
-    }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
similarity index 60%
rename from 
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
rename to 
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
index 1f83b63..37b4d47 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/IndicatorEsDAO.java
@@ -21,8 +21,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
-import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.core.storage.*;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -32,48 +31,46 @@ import org.elasticsearch.common.xcontent.*;
 /**
  * @author peng-yongsheng
  */
-public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest, 
UpdateRequest, Indicator> {
+public class IndicatorEsDAO extends EsDAO implements 
IIndicatorDAO<IndexRequest, UpdateRequest> {
 
-    private final ElasticSearchClient client;
-    private final NameSpace nameSpace;
+    private final StorageBuilder<Indicator> storageBuilder;
 
-    public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
-        this.client = client;
-        this.nameSpace = nameSpace;
+    public IndicatorEsDAO(ElasticSearchClient client, 
StorageBuilder<Indicator> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
     }
 
-    @Override public Indicator get(Indicator input) throws IOException {
-        GetResponse response = client.get(nameSpace.getNameSpace() + "_" + 
input.name(), input.id());
+    @Override public Indicator get(String modelName, Indicator indicator) 
throws IOException {
+        GetResponse response = getClient().get(modelName, indicator.id());
         if (response.isExists()) {
-            return input.newOne(response.getSource());
+            return storageBuilder.map2Data(response.getSource());
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequest prepareBatchInsert(Indicator input) throws 
IOException {
-        Map<String, Object> objectMap = input.toMap();
+    @Override public IndexRequest prepareBatchInsert(String modelName, 
Indicator indicator) throws IOException {
+        Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
 
         XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
         for (String key : objectMap.keySet()) {
             builder.field(key, objectMap.get(key));
         }
         builder.endObject();
-        return client.prepareInsert(nameSpace.getNameSpace() + "_" + 
input.name(), input.id(), builder);
+        return getClient().prepareInsert(modelName, indicator.id(), builder);
     }
 
-    @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws 
IOException {
-        Map<String, Object> objectMap = input.toMap();
+    @Override public UpdateRequest prepareBatchUpdate(String modelName, 
Indicator indicator) throws IOException {
+        Map<String, Object> objectMap = storageBuilder.data2Map(indicator);
 
         XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
         for (String key : objectMap.keySet()) {
             builder.field(key, objectMap.get(key));
         }
         builder.endObject();
-        return client.prepareUpdate(nameSpace.getNameSpace() + "_" + 
input.name(), input.id(), builder);
+        return getClient().prepareUpdate(modelName, indicator.id(), builder);
     }
 
-    @Override public void deleteHistory(Long timeBucketBefore) {
-
+    @Override public void deleteHistory(String modelName, Long 
timeBucketBefore) {
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
new file mode 100644
index 0000000..13c6711
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.storage.*;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RegisterEsDAO.class);
+
+    private final StorageBuilder<RegisterSource> storageBuilder;
+
+    public RegisterEsDAO(ElasticSearchClient client, 
StorageBuilder<RegisterSource> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
+
+    @Override public RegisterSource get(String modelName, String id) throws 
IOException {
+        GetResponse response = getClient().get(modelName, id);
+        if (response.isExists()) {
+            return storageBuilder.map2Data(response.getSource());
+        } else {
+            return null;
+        }
+    }
+
+    @Override public void forceInsert(String modelName, RegisterSource source) 
throws IOException {
+        Map<String, Object> objectMap = storageBuilder.data2Map(source);
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+
+        getClient().forceInsert(modelName, source.id(), builder);
+    }
+
+    @Override public void forceUpdate(String modelName, RegisterSource source) 
throws IOException {
+        Map<String, Object> objectMap = storageBuilder.data2Map(source);
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            builder.field(key, objectMap.get(key));
+        }
+        builder.endObject();
+
+        getClient().forceUpdate(modelName, source.id(), builder);
+    }
+
+    @Override public int max(String modelName) throws IOException {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        
searchSourceBuilder.aggregation(AggregationBuilders.max(RegisterSource.SEQUENCE).field(RegisterSource.SEQUENCE));
+        searchSourceBuilder.size(0);
+        return getResponse(modelName, searchSourceBuilder);
+    }
+
+    private int getResponse(String modelName, SearchSourceBuilder 
searchSourceBuilder) throws IOException {
+        SearchResponse searchResponse = getClient().search(modelName, 
searchSourceBuilder);
+        Max agg = 
searchResponse.getAggregations().get(RegisterSource.SEQUENCE);
+
+        int id = (int)agg.getValue();
+        if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
+            return 0;
+        } else {
+            return id;
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
similarity index 58%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
rename to 
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 99a8239..641582f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -16,23 +16,22 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.endpoint;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
-import 
org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
-import org.apache.skywalking.oap.server.core.worker.annotation.Worker;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 
 /**
  * @author peng-yongsheng
  */
-@Worker
-public class EndpointLatencyAvgPersistentWorker extends 
AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
+public class StorageEsDAO extends EsDAO implements StorageDAO {
 
-    public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
-        super(moduleManager);
+    public StorageEsDAO(ElasticSearchClient client) {
+        super(client);
     }
 
-    @Override protected boolean needMergeDBData() {
-        return true;
+    @Override public IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> 
storageBuilder) {
+        return new IndicatorEsDAO(getClient(), storageBuilder);
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
new file mode 100644
index 0000000..8645bc8
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/EndpointCacheEsDAO.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
+
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.register.endpoint.Endpoint;
+import org.apache.skywalking.oap.server.core.storage.cache.IEndpointCacheDAO;
+import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.get.GetResponse;
+
+/**
+ * @author peng-yongsheng
+ */
+public class EndpointCacheEsDAO extends EsDAO implements IEndpointCacheDAO {
+
+    public EndpointCacheEsDAO(ElasticSearchClient client) {
+        super(client);
+    }
+
+    @Override public int get(String id) {
+        try {
+            GetResponse response = getClient().get("", id);
+            if (response.isExists()) {
+                return response.getField(RegisterSource.SEQUENCE).getValue();
+            } else {
+                return 0;
+            }
+        } catch (Throwable e) {
+            return 0;
+        }
+    }
+
+    @Override public Endpoint get(int sequence) {
+        return null;
+    }
+}

Reply via email to