This is an automated email from the ASF dual-hosted git repository.
kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebd156 [Feature]Add fetch/update/clear proto of fe&be for cache
(#4190)
1ebd156 is described below
commit 1ebd156b99d1e1eaaedbf8ca0e9d3708438e4b11
Author: HaiBo Li <[email protected]>
AuthorDate: Fri Jul 31 13:23:24 2020 +0800
[Feature]Add fetch/update/clear proto of fe&be for cache (#4190)
---
.../org/apache/doris/rpc/BackendServiceProxy.java | 42 +++++++++++++++
.../java/org/apache/doris/rpc/PBackendService.java | 14 +++++
gensrc/proto/internal_service.proto | 62 ++++++++++++++++++++++
gensrc/proto/palo_internal_service.proto | 3 ++
4 files changed, 121 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index d9a90de..7a01b7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -28,6 +28,11 @@ import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PClearCacheRequest;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
@@ -164,6 +169,43 @@ public class BackendServiceProxy {
}
}
+ public Future<PCacheResponse> updateCache(
+ TNetworkAddress address, PUpdateCacheRequest request) throws
RpcException{
+ try {
+ PBackendService service = getProxy(address);
+ return service.updateCache(request);
+ } catch (Throwable e) {
+ LOG.warn("update cache catch a exception, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+ public Future<PFetchCacheResult> fetchCache(
+ TNetworkAddress address, PFetchCacheRequest request) throws
RpcException {
+ try {
+ PBackendService service = getProxy(address);
+ return service.fetchCache(request);
+ } catch (Throwable e) {
+ LOG.warn("fetch cache catch a exception, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+ public Future<PCacheResponse> clearCache(
+ TNetworkAddress address, PClearCacheRequest request) throws
RpcException {
+ try {
+ PBackendService service = getProxy(address);
+ return service.clearCache(request);
+ } catch (Throwable e) {
+ LOG.warn("clear cache catch a exception, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+
public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
TNetworkAddress address, PTriggerProfileReportRequest request)
throws RpcException {
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
index 38bc2e7..3e68009 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
@@ -24,6 +24,11 @@ import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
import com.baidu.jprotobuf.pbrpc.ProtobufRPC;
@@ -43,6 +48,15 @@ public interface PBackendService {
attachmentHandler = ThriftClientAttachmentHandler.class,
onceTalkTimeout = 86400000)
Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
+ @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache",
onceTalkTimeout = 10000)
+ Future<PCacheResponse> updateCache(PUpdateCacheRequest request);
+
+ @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache",
onceTalkTimeout = 10000)
+ Future<PFetchCacheResult> fetchCache(PFetchCacheRequest request);
+
+ @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache",
onceTalkTimeout = 10000)
+ Future<PCacheResponse> clearCache(PClearCacheRequest request);
+
@ProtobufRPC(serviceName = "PBackendService", methodName =
"trigger_profile_report",
attachmentHandler = ThriftClientAttachmentHandler.class,
onceTalkTimeout = 10000)
Future<PTriggerProfileReportResult>
triggerProfileReport(PTriggerProfileReportRequest request);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 80f3be9d..cc0d08b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -147,6 +147,65 @@ message PFetchDataResult {
optional PQueryStatistics query_statistics = 4;
};
+//Add message definition to fetch and update cache
+enum PCacheStatus {
+ DEFAULT = 0;
+ CACHE_OK = 1;
+ PARAM_ERROR = 2;
+ SIZE_OVER_LIMIT = 3;
+ NO_SQL_KEY = 4;
+ NO_PARTITION_KEY = 5;
+ INVALID_KEY_RANGE = 6;
+ DATA_OVERDUE = 7;
+ EMPTY_DATA = 8;
+};
+
+message PCacheParam {
+ required int64 partition_key = 1;
+ optional int64 last_version = 2;
+ optional int64 last_version_time = 3;
+};
+
+message PCacheValue {
+ required PCacheParam param = 1;
+ required int32 data_size = 2;
+ repeated bytes row = 3;
+};
+
+//for update&clear return
+message PCacheResponse {
+ required PCacheStatus status = 1;
+};
+
+message PUpdateCacheRequest{
+ required PUniqueId sql_key = 1;
+ repeated PCacheValue value = 2;
+};
+
+message PFetchCacheRequest {
+ required PUniqueId sql_key = 1;
+ repeated PCacheParam param = 2;
+};
+
+message PFetchCacheResult {
+ required PCacheStatus status = 1;
+ repeated PCacheValue value = 2;
+};
+
+enum PClearType {
+ CLEAR_ALL = 0;
+ PRUNE_CACHE = 1;
+ CLEAR_BEFORE_TIME = 2;
+ CLEAR_SQL_KEY = 3;
+};
+
+message PClearCacheRequest {
+ required PClearType clear_type = 1;
+ optional int64 before_time = 2;
+ optional PUniqueId sql_key = 3;
+};
+//End cache proto definition
+
message PTriggerProfileReportRequest {
repeated PUniqueId instance_ids = 1;
};
@@ -195,5 +254,8 @@ service PBackendService {
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns
(PTabletWriterCancelResult);
rpc trigger_profile_report(PTriggerProfileReportRequest) returns
(PTriggerProfileReportResult);
rpc get_info(PProxyRequest) returns (PProxyResult);
+ rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
+ rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
+ rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
};
diff --git a/gensrc/proto/palo_internal_service.proto
b/gensrc/proto/palo_internal_service.proto
index 3adc1ec..31adb76 100644
--- a/gensrc/proto/palo_internal_service.proto
+++ b/gensrc/proto/palo_internal_service.proto
@@ -36,4 +36,7 @@ service PInternalService {
rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns
(doris.PTabletWriterCancelResult);
rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns
(doris.PTriggerProfileReportResult);
rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult);
+ rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
+ rpc fetch_cache(doris.PFetchCacheRequest) returns
(doris.PFetchCacheResult);
+ rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse);
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]