wu-sheng closed pull request #899: Collector cluster mode stream error
URL: https://github.com/apache/incubator-skywalking/pull/899
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
index 0611d6e89..0eb086ee6 100644
--- 
a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
+++ 
b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/RegisterMock.java
@@ -99,7 +99,7 @@ private void registerConsumer() throws InterruptedException {
         serviceNameElement.setSrcSpanType(SpanType.Exit);
         serviceNameCollection.addElements(serviceNameElement);
 
-        registerServiceName(serviceNameCollection);
+//        registerServiceName(serviceNameCollection);
 
         heartBeatScheduled(instanceMapping.getApplicationInstanceId());
     }
@@ -144,7 +144,7 @@ private void registerProvider() throws InterruptedException 
{
         serviceNameElement.setSrcSpanType(SpanType.Entry);
         serviceNameCollection.addElements(serviceNameElement);
 
-        registerServiceName(serviceNameCollection);
+//        registerServiceName(serviceNameCollection);
 
         heartBeatScheduled(instanceMapping.getApplicationInstanceId());
     }
diff --git 
a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java
 
b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java
index 9c560ff4a..708b93d5e 100644
--- 
a/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java
+++ 
b/apm-collector/apm-collector-analysis/analysis-alarm/alarm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/alarm/provider/AnalysisAlarmModuleProvider.java
@@ -34,7 +34,14 @@
 import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
 import 
org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
 import org.apache.skywalking.apm.collector.remote.RemoteModule;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
+import 
org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm;
+import 
org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList;
+import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm;
+import 
org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList;
+import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm;
+import 
org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList;
 
 /**
  * @author peng-yongsheng
@@ -74,6 +81,8 @@
         ApplicationReferenceMetricAlarmGraph 
applicationReferenceMetricAlarmGraph = new 
ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener);
         applicationReferenceMetricAlarmGraph.create();
 
+        registerRemoteData();
+
         PersistenceTimer persistenceTimer = new 
PersistenceTimer(AnalysisAlarmModule.NAME);
         persistenceTimer.start(getManager(), 
workerCreateListener.getPersistenceWorkers());
     }
@@ -85,4 +94,15 @@
     @Override public String[] requiredModules() {
         return new String[] {RemoteModule.NAME, AnalysisMetricModule.NAME, 
ConfigurationModule.NAME, StorageModule.NAME};
     }
+
+    private void registerRemoteData() {
+        RemoteDataRegisterService remoteDataRegisterService = 
getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
+        remoteDataRegisterService.register(ApplicationAlarm.class, new 
ApplicationAlarm.InstanceCreator());
+        remoteDataRegisterService.register(ApplicationAlarmList.class, new 
ApplicationAlarmList.InstanceCreator());
+        remoteDataRegisterService.register(InstanceAlarm.class, new 
InstanceAlarm.InstanceCreator());
+        remoteDataRegisterService.register(InstanceAlarmList.class, new 
InstanceAlarmList.InstanceCreator());
+        remoteDataRegisterService.register(ServiceAlarm.class, new 
ServiceAlarm.InstanceCreator());
+        remoteDataRegisterService.register(ServiceAlarmList.class, new 
ServiceAlarmList.InstanceCreator());
+
+    }
 }
diff --git 
a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
 
b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
index bf0a4a6a9..6ac9e06c8 100644
--- 
a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
+++ 
b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
@@ -47,6 +47,17 @@
 import org.apache.skywalking.apm.collector.core.module.Module;
 import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
 import 
org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
+import org.apache.skywalking.apm.collector.remote.RemoteModule;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
+import 
org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
+import 
org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
+import 
org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric;
+import 
org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
+import 
org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
+import 
org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
+import 
org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
+import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric;
+import 
org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
 
 /**
  * @author peng-yongsheng
@@ -74,6 +85,8 @@
 
         graphCreate(workerCreateListener);
 
+        registerRemoteData();
+
         PersistenceTimer persistenceTimer = new 
PersistenceTimer(AnalysisMetricModule.NAME);
         persistenceTimer.start(getManager(), 
workerCreateListener.getPersistenceWorkers());
     }
@@ -133,4 +146,17 @@ private void graphCreate(WorkerCreateListener 
workerCreateListener) {
         InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = 
new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener);
         instanceHeartBeatPersistenceGraph.create();
     }
+
+    private void registerRemoteData() {
+        RemoteDataRegisterService remoteDataRegisterService = 
getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
+        remoteDataRegisterService.register(ApplicationComponent.class, new 
ApplicationComponent.InstanceCreator());
+        remoteDataRegisterService.register(ApplicationMapping.class, new 
ApplicationMapping.InstanceCreator());
+        remoteDataRegisterService.register(ApplicationMetric.class, new 
ApplicationMetric.InstanceCreator());
+        remoteDataRegisterService.register(ApplicationReferenceMetric.class, 
new ApplicationReferenceMetric.InstanceCreator());
+        remoteDataRegisterService.register(InstanceMapping.class, new 
InstanceMapping.InstanceCreator());
+        remoteDataRegisterService.register(InstanceMetric.class, new 
InstanceMetric.InstanceCreator());
+        remoteDataRegisterService.register(InstanceReferenceMetric.class, new 
InstanceReferenceMetric.InstanceCreator());
+        remoteDataRegisterService.register(ServiceMetric.class, new 
ServiceMetric.InstanceCreator());
+        remoteDataRegisterService.register(ServiceReferenceMetric.class, new 
ServiceReferenceMetric.InstanceCreator());
+    }
 }
diff --git 
a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java
 
b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java
index ac4eb5a1a..f8a232355 100644
--- 
a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java
+++ 
b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/AnalysisRegisterModuleProvider.java
@@ -39,7 +39,12 @@
 import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
 import 
org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
 import org.apache.skywalking.apm.collector.remote.RemoteModule;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
+import org.apache.skywalking.apm.collector.storage.table.register.Application;
+import org.apache.skywalking.apm.collector.storage.table.register.Instance;
+import 
org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
+import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
 
 /**
  * @author peng-yongsheng
@@ -68,6 +73,8 @@
 
         graphCreate(workerCreateListener);
 
+        registerRemoteData();
+
         PersistenceTimer persistenceTimer = new 
PersistenceTimer(AnalysisRegisterModule.NAME);
         persistenceTimer.start(getManager(), 
workerCreateListener.getPersistenceWorkers());
     }
@@ -93,4 +100,12 @@ private void graphCreate(WorkerCreateListener 
workerCreateListener) {
         NetworkAddressRegisterGraph networkAddressRegisterGraph = new 
NetworkAddressRegisterGraph(getManager(), workerCreateListener);
         networkAddressRegisterGraph.create();
     }
+
+    private void registerRemoteData() {
+        RemoteDataRegisterService remoteDataRegisterService = 
getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
+        remoteDataRegisterService.register(Application.class, new 
Application.InstanceCreator());
+        remoteDataRegisterService.register(Instance.class, new 
Instance.InstanceCreator());
+        remoteDataRegisterService.register(NetworkAddress.class, new 
NetworkAddress.InstanceCreator());
+        remoteDataRegisterService.register(ServiceName.class, new 
ServiceName.InstanceCreator());
+    }
 }
diff --git 
a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java
 
b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java
index 943b4d31b..133413c22 100644
--- 
a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java
+++ 
b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/apache/skywalking/apm/collector/remote/service/RemoteDataRegisterService.java
@@ -28,6 +28,6 @@
     void register(Class<? extends RemoteData> dataClass, 
RemoteDataInstanceCreator instanceCreator);
 
     interface RemoteDataInstanceCreator<REMOTE_DATA extends RemoteData> {
-        REMOTE_DATA createInstance(String id);
+        REMOTE_DATA createInstance();
     }
 }
diff --git 
a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java
 
b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java
index c9cc8f53e..a2de37dc0 100644
--- 
a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java
+++ 
b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/handler/RemoteCommonServiceHandler.java
@@ -21,7 +21,6 @@
 import io.grpc.stub.StreamObserver;
 import org.apache.skywalking.apm.collector.core.graph.GraphManager;
 import org.apache.skywalking.apm.collector.core.graph.Next;
-import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.remote.grpc.proto.Empty;
 import 
org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
 import org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteData;
@@ -58,7 +57,7 @@ public 
RemoteCommonServiceHandler(RemoteDataInstanceCreatorGetter instanceCreato
                 RemoteData remoteData = message.getRemoteData();
 
                 try {
-                    org.apache.skywalking.apm.collector.core.data.RemoteData 
output = 
instanceCreatorGetter.getInstanceCreator(remoteDataId).createInstance(Const.EMPTY_STRING);
+                    org.apache.skywalking.apm.collector.core.data.RemoteData 
output = 
instanceCreatorGetter.getInstanceCreator(remoteDataId).createInstance();
                     service.deserialize(remoteData, output);
                     Next next = 
GraphManager.INSTANCE.findGraph(graphId).toFinder().findNext(nodeId);
                     next.execute(output);
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
index d8cbcc1d3..e36958bc0 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -114,4 +116,10 @@ public String getAlarmContent() {
     public void setAlarmContent(String alarmContent) {
         setDataString(1, alarmContent);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationAlarm();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
index 2b68ab7d2..c9ca654f6 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -107,4 +109,10 @@ public Long getTimeBucket() {
     public void setTimeBucket(Long timeBucket) {
         setDataLong(0, timeBucket);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationAlarmList();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
index 8c539159c..389ecb67c 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -123,4 +125,10 @@ public String getAlarmContent() {
     public void setAlarmContent(String alarmContent) {
         setDataString(1, alarmContent);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new InstanceAlarm();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
index c89b1867f..50d58ff3e 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -115,4 +117,10 @@ public String getAlarmContent() {
     public void setAlarmContent(String alarmContent) {
         setDataString(1, alarmContent);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new InstanceAlarmList();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
index 8071844c4..582924402 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -132,4 +134,10 @@ public String getAlarmContent() {
     public void setAlarmContent(String alarmContent) {
         setDataString(1, alarmContent);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ServiceAlarm();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
index 4e059c571..86aee154c 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -124,4 +126,10 @@ public String getAlarmContent() {
     public void setAlarmContent(String alarmContent) {
         setDataString(1, alarmContent);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ServiceAlarmList();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
index cf6e09e69..6c3ecb97c 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.application;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -89,4 +91,10 @@ public Integer getApplicationId() {
     public void setApplicationId(Integer applicationId) {
         setDataInteger(1, applicationId);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationComponent();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
index e1b3fff77..47df1c406 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.application;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -89,4 +91,10 @@ public long getTimeBucket() {
     public void setTimeBucket(long timeBucket) {
         setDataLong(0, timeBucket);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationMapping();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
index 73fe7bdb4..ed5602358 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.application;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -254,4 +256,10 @@ public long getFrustratedCount() {
     public void setFrustratedCount(long frustratedCount) {
         setDataLong(15, frustratedCount);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationMetric();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
index 160c3e48f..8e00ca43e 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.application;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -263,4 +265,10 @@ public long getFrustratedCount() {
     public void setFrustratedCount(long frustratedCount) {
         setDataLong(15, frustratedCount);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ApplicationReferenceMetric();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
index 602da9283..13b9208bc 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -98,4 +100,10 @@ public long getTimeBucket() {
     public void setTimeBucket(long timeBucket) {
         setDataLong(0, timeBucket);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new InstanceMapping();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
index b66b0c70e..fd766f41b 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
@@ -19,10 +19,12 @@
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -237,4 +239,10 @@ public Long getMqTransactionErrorDurationSum() {
     public void setMqTransactionErrorDurationSum(Long 
mqTransactionErrorDurationSum) {
         setDataLong(12, mqTransactionErrorDurationSum);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new InstanceMetric();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
index 888d50139..c110ec0d4 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -254,4 +256,10 @@ public Long getMqTransactionErrorDurationSum() {
     public void setMqTransactionErrorDurationSum(Long 
mqTransactionErrorDurationSum) {
         setDataLong(12, mqTransactionErrorDurationSum);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new InstanceReferenceMetric();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
index f5107a163..a55965fe3 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -105,4 +107,10 @@ public int getIsAddress() {
     public void setIsAddress(int isAddress) {
         setDataInteger(3, isAddress);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new Application();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
index 8efe7dcff..3cada85ff 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -142,4 +144,10 @@ public int getIsAddress() {
     public void setIsAddress(int isAddress) {
         setDataInteger(3, isAddress);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new Instance();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
index 74dd2930c..875c093b4 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -97,4 +99,10 @@ public Integer getServerType() {
     public void setServerType(Integer serverType) {
         setDataInteger(2, serverType);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new NetworkAddress();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
index 1e377f1ce..adf6a1193 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
  * @author peng-yongsheng
@@ -96,4 +98,10 @@ public int getSrcSpanType() {
     public void setSrcSpanType(int srcSpanType) {
         setDataInteger(2, srcSpanType);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ServiceName();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
index 443b9696f..16c4b8ba5 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.service;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -245,4 +247,10 @@ public Long getMqTransactionErrorDurationSum() {
     public void setMqTransactionErrorDurationSum(Long 
mqTransactionErrorDurationSum) {
         setDataLong(12, mqTransactionErrorDurationSum);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ServiceMetric();
+        }
+    }
 }
diff --git 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
index d7d54df4e..41fdf951c 100644
--- 
a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
+++ 
b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.apm.collector.storage.table.service;
 
 import org.apache.skywalking.apm.collector.core.data.Column;
+import org.apache.skywalking.apm.collector.core.data.RemoteData;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
 import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
+import 
org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
 /**
@@ -275,4 +277,10 @@ public Long getMqTransactionErrorDurationSum() {
     public void setMqTransactionErrorDurationSum(Long 
mqTransactionErrorDurationSum) {
         setDataLong(12, mqTransactionErrorDurationSum);
     }
+
+    public static class InstanceCreator implements 
RemoteDataRegisterService.RemoteDataInstanceCreator {
+        @Override public RemoteData createInstance() {
+            return new ServiceReferenceMetric();
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to