This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 446f8512176755685c109321f900772920782a43 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Wed Jul 17 16:45:49 2024 -0700 Add protobuff definition and an empty grpc service (#2834) Add protobuff definition and an empty grpc service --- helix-gateway/pom.xml | 72 ++++++++++++++++++++++ .../grpcservice/HelixGatewayServiceService.java | 34 ++++++++++ .../helix/gateway/service/ClusterManager.java | 3 + .../helix/gateway/service/HelixGatewayService.java | 4 ++ .../helix/gateway/service/ReplicaStateTracker.java | 12 ++++ .../service/StateTransitionMessageTranslator.java | 4 ++ .../src/main/proto/HelixGatewayService.proto | 52 ++++++++++++++++ .../HelixGatewayService.proto | 35 ----------- 8 files changed, 181 insertions(+), 35 deletions(-) diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml index b40de852a..f788f5bcf 100644 --- a/helix-gateway/pom.xml +++ b/helix-gateway/pom.xml @@ -43,6 +43,9 @@ </organization> <properties> + <protobuf.version>3.25.3</protobuf.version> + <protoc.version>3.25.3</protoc.version> + <grpc.version>1.65.1</grpc.version> <osgi.import> org.slf4j*;version="[1.7,2)", org.apache.logging.log4j*;version="[2.17,3)", @@ -52,6 +55,7 @@ <osgi.export>org.apache.helix*;version="${project.version};-noimport:=true</osgi.export> </properties> + <dependencies> <dependency> <groupId>org.apache.helix</groupId> @@ -82,8 +86,58 @@ <groupId>org.apache.helix</groupId> <artifactId>helix-core</artifactId> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-services</artifactId> + <version>1.65.1</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <scope>runtime</scope> + <version>1.65.1</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>1.65.1</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>1.65.1</version> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>annotations-api</artifactId> + <version>6.0.53</version> + <scope>provided</scope> <!-- not needed at runtime --> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <scope>test</scope> + <version>1.65.1</version> + </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + <version>1.3.2</version> + </dependency> </dependencies> <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.0</version> + </extension> + </extensions> <resources> <resource> <directory>${basedir}</directory> @@ -141,6 +195,24 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java new file mode 100644 index 000000000..441fba952 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java @@ -0,0 +1,34 @@ +package org.apache.helix.gateway.grpcservice; + +import proto.org.apache.helix.gateway.*; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; +import io.grpc.stub.StreamObserver; + +public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase { + + @Override + public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + + return new StreamObserver<ShardStateMessage>() { + + @Override + public void onNext(ShardStateMessage request) { + // called when a client sends a message + //.... + } + + @Override + public void onError(Throwable t) { + // called when a client sends an error + //.... + } + + @Override + public void onCompleted() { + // called when the client completes + //.... + } + }; + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java index ad1a6eca6..80bd15aaa 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java @@ -16,6 +16,9 @@ public class ClusterManager { private Map<String, MockApplication> _channelMap; private Lock _lock = new ReentrantLock(); + // event queue + // state tracker, call tracker.update + public ClusterManager() { _flagMap = new ConcurrentHashMap<>(); _channelMap = new ConcurrentHashMap<>(); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java index b4d21f921..4f44efcd6 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java @@ -50,4 +50,8 @@ public class HelixGatewayService { _clusterManager.removeChannel(participantName); } } + + public void stop() { + System.out.println("Stoping Helix Gateway Service"); + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java new file mode 100644 index 000000000..1e7c16e38 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java @@ -0,0 +1,12 @@ +package org.apache.helix.gateway.service; + +public class ReplicaStateTracker { + + boolean compareTargetState(){ + return true; + } + + boolean updateReplicaState() { + return true; + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java new file mode 100644 index 000000000..ae2dbfe94 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java @@ -0,0 +1,4 @@ +package org.apache.helix.gateway.service; + +public class StateTransitionMessageTranslator { +} diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto new file mode 100644 index 000000000..c82431123 --- /dev/null +++ b/helix-gateway/src/main/proto/HelixGatewayService.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +package proto.org.apache.helix.gateway; + +message SingleTransitionMessage { + enum TransitionType { + ADD_SHARD = 0; + DELETE_SHARD = 1; + CHANGE_ROLE = 2; + } + string transitionID = 1; // ID of transition message + TransitionType transitionType = 2; // Transition type for shard operations + string resourceID = 3; // Resource ID + string shardID = 4; // Shard to perform operation + optional string startState = 5; // Shard start state + string targetState = 6; // Shard target state. +} + +message TransitionMessage{ + repeated SingleTransitionMessage request = 1; +} + +message SingleShardTransitionStatus { + string transitionID = 1; // ID of transition message + bool isSuccess = 2; // Was transition successfully performed + optional string currentState = 3; // If it failed, what is the current state it should reported as. +} + +// resource has list of replica + +message SingleResourceState { + string resource = 1; + repeated SingleShardState SingleReplicaState= 2; + +} + +message SingleShardState { + string shardaName = 1; + string currentState = 2; +} + +// +message ShardStateMessage{ + repeated SingleShardTransitionStatus replicaStateST = 1; + repeated SingleShardState shardState = 2; +} + + +service HelixGatewayService { + rpc report(stream ShardStateMessage) returns (stream TransitionMessage) {} +} + diff --git a/helix-gateway/src/main/protobuf/org.apache.helix.gateway/HelixGatewayService.proto b/helix-gateway/src/main/protobuf/org.apache.helix.gateway/HelixGatewayService.proto deleted file mode 100644 index 42347f038..000000000 --- a/helix-gateway/src/main/protobuf/org.apache.helix.gateway/HelixGatewayService.proto +++ /dev/null @@ -1,35 +0,0 @@ -syntax = "proto3"; - -package proto.org.apache.helix.gateway; - -message SingleTransitionRequest { - enum TransitionType { - ADD_SHARD = 0; - DELETE_SHARD = 1; - CHANGE_ROLE = 2; - } - string transitionID = 1; // ID of transition message - TransitionType transitionType = 2; // Transition type for shard operations - string resourceID = 3; // Resource ID - string shardID = 4; // Shard to perform operation - optional string startState = 5; // Shard start state, it is not mandatory. Application can decide how to get target state. - optional string targetState = 6; // Shard target state. -} - -message TransitionRequests { - repeated SingleTransitionRequest request = 1; -} - -message SingleTransitionResponse { - string transitionID = 1; // ID of transition message - bool isSuccess = 2; // Was transition successfully performed - optional string currentState = 3; // If it failed, what is the current state it should reported as. -} - -message TransitionResponse { - repeated SingleTransitionResponse response = 1; -} - -service GatewayService { - rpc transition(TransitionRequests) returns (TransitionResponse) {} -}