This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new af35841  RATIS-1116. Add DataStreamType. (#238)
af35841 is described below

commit af358415cc8ebc70665d73e5fc4812b7cb669c75
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 29 21:12:04 2020 +0800

    RATIS-1116. Add DataStreamType. (#238)
    
    * RATIS-1116. Add DataStreamType.
    
    * Fix a bug and change the cast(..) method to newInstance(..).
---
 .../ratis/client/DataStreamClientFactory.java      |  9 ++---
 ...y.java => DisabledDataStreamClientFactory.java} | 32 +++++++++--------
 .../ratis/client/impl/DataStreamClientImpl.java    |  2 +-
 .../apache/ratis/datastream/DataStreamFactory.java |  4 +--
 .../DataStreamType.java}                           | 42 +++++++++++-----------
 .../ratis/datastream/SupportedDataStreamType.java  | 28 ++++++++++-----
 .../main/java/org/apache/ratis/rpc/RpcType.java    |  4 +--
 .../ratis/server/DataStreamServerFactory.java      | 10 +++---
 ...y.java => DisabledDataStreamServerFactory.java} | 22 ++----------
 .../ratis/server/impl/DataStreamServerImpl.java    |  4 +--
 .../ratis/datastream/TestDataStreamDisabled.java   |  4 ++-
 11 files changed, 81 insertions(+), 80 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
index c77fb7a..1e1ea3b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
@@ -18,22 +18,23 @@
 
 package org.apache.ratis.client;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.DataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
 
 /**
  * A factory to create streaming client.
  */
 public interface DataStreamClientFactory extends DataStreamFactory {
-
-  static DataStreamClientFactory cast(DataStreamFactory dataStreamFactory) {
+  static DataStreamClientFactory newInstance(DataStreamType type, Parameters 
parameters) {
+    final DataStreamFactory dataStreamFactory = 
type.newClientFactory(parameters);
     if (dataStreamFactory instanceof DataStreamClientFactory) {
       return (DataStreamClientFactory) dataStreamFactory;
     }
     throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
-        + " to " + ClientFactory.class
-        + "; stream type is " + dataStreamFactory.getDataStreamType());
+        + " to " + DataStreamClientFactory.class + "; stream type is " + type);
   }
 
   DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties 
properties);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
similarity index 57%
copy from 
ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
copy to 
ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
index c77fb7a..1347e7b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
@@ -15,26 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.client;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
 
-/**
- * A factory to create streaming client.
- */
-public interface DataStreamClientFactory extends DataStreamFactory {
+/** A stream factory that does nothing when data stream is disabled. */
+public class DisabledDataStreamClientFactory implements 
DataStreamClientFactory {
+  public DisabledDataStreamClientFactory(Parameters parameters) {}
 
-  static DataStreamClientFactory cast(DataStreamFactory dataStreamFactory) {
-    if (dataStreamFactory instanceof DataStreamClientFactory) {
-      return (DataStreamClientFactory) dataStreamFactory;
-    }
-    throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
-        + " to " + ClientFactory.class
-        + "; stream type is " + dataStreamFactory.getDataStreamType());
+  @Override
+  public SupportedDataStreamType getDataStreamType() {
+    return SupportedDataStreamType.DISABLED;
   }
 
-  DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties 
properties);
+  @Override
+  public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, 
RaftProperties properties) {
+    return new DataStreamClientRpc() {
+      @Override
+      public void startClient() {}
+
+      @Override
+      public void closeClient() {}
+    };
+  }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 51e9eb6..9316c62 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -60,7 +60,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
     this.raftServer = Objects.requireNonNull(server, "server == null");
 
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
-    this.dataStreamClientRpc = 
DataStreamClientFactory.cast(type.newFactory(parameters))
+    this.dataStreamClientRpc = DataStreamClientFactory.newInstance(type, 
parameters)
                                .newDataStreamClientRpc(raftServer, properties);
 
     this.orderedStreamAsync = new OrderedStreamAsync(clientId, 
dataStreamClientRpc, properties);
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java 
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
index 6d88e7c..1e4874a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
@@ -15,9 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.datastream;
 
-public interface DataStreamFactory {
-  SupportedDataStreamType getDataStreamType();
+public interface DataStreamFactory extends DataStreamType.Get {
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java 
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
similarity index 51%
copy from ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
copy to 
ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
index 79b08b6..f157fa0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,37 +15,36 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.ratis.rpc;
+package org.apache.ratis.datastream;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.util.ReflectionUtils;
 
-/** The type of RPC implementations. */
-public interface RpcType {
+/** The type of data stream implementations. */
+public interface DataStreamType {
   /**
-   * Parse the given string as a {@link SupportedRpcType}
-   * or a user-defined {@link RpcType}.
+   * Parse the given string as a {@link SupportedDataStreamType}
+   * or a user-defined {@link DataStreamType}.
    *
-   * @param rpcType The string representation of an {@link RpcType}.
-   * @return a {@link SupportedRpcType} or a user-defined {@link RpcType}.
+   * @param dataStreamType The string representation of an {@link 
DataStreamType}.
+   * @return a {@link SupportedDataStreamType} or a user-defined {@link 
DataStreamType}.
    */
-  static RpcType valueOf(String rpcType) {
+  static DataStreamType valueOf(String dataStreamType) {
     final Throwable fromSupportedRpcType;
     try { // Try parsing it as a SupportedRpcType
-      return SupportedRpcType.valueOfIgnoreCase(rpcType);
+      return SupportedDataStreamType.valueOfIgnoreCase(dataStreamType);
     } catch (Throwable t) {
       fromSupportedRpcType = t;
     }
 
     try {
       // Try using it as a class name
-      return ReflectionUtils.newInstance(
-          ReflectionUtils.getClass(rpcType, RpcType.class));
+      return 
ReflectionUtils.newInstance(ReflectionUtils.getClass(dataStreamType, 
DataStreamType.class));
     } catch(Throwable t) {
       final IllegalArgumentException iae = new IllegalArgumentException(
-          "Invalid " + RpcType.class.getSimpleName() + ": \"" + rpcType + "\" "
-              + " cannot be used as a user-defined " + 
RpcType.class.getSimpleName()
-              + " and it is not a " + SupportedRpcType.class.getSimpleName() + 
".");
+          "Invalid " + DataStreamType.class.getSimpleName() + ": \"" + 
dataStreamType + "\" "
+              + " cannot be used as a user-defined " + 
DataStreamType.class.getSimpleName()
+              + " and it is not a " + 
SupportedDataStreamType.class.getSimpleName() + ".");
       iae.addSuppressed(t);
       iae.addSuppressed(fromSupportedRpcType);
       throw iae;
@@ -55,12 +54,15 @@ public interface RpcType {
   /** @return the name of the rpc type. */
   String name();
 
-  /** @return a new factory created using the given properties and parameters. 
*/
-  RpcFactory newFactory(Parameters parameters);
+  /** @return a new client factory created using the given parameters. */
+  DataStreamFactory newClientFactory(Parameters parameters);
 
-  /** An interface to get {@link RpcType}. */
+  /** @return a new server factory created using the given parameters. */
+  DataStreamFactory newServerFactory(Parameters parameters);
+
+  /** An interface to get {@link DataStreamType}. */
   interface Get {
-    /** @return the {@link RpcType}. */
-    RpcType getRpcType();
+    /** @return the {@link DataStreamType}. */
+    DataStreamType getDataStreamType();
   }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
index ff9d3da..019093e 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
@@ -20,30 +20,40 @@ package org.apache.ratis.datastream;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.util.ReflectionUtils;
 
-public enum SupportedDataStreamType implements DataStreamFactory {
-  DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
+public enum SupportedDataStreamType implements DataStreamType {
+  DISABLED("org.apache.ratis.client.DisabledDataStreamClientFactory",
+      "org.apache.ratis.server.DisabledDataStreamServerFactory"),
   NETTY("org.apache.ratis.netty.NettyDataStreamFactory");
 
-  private final String factoryClassName;
-
   private static final Class<?>[] ARG_CLASSES = {Parameters.class};
 
   public static SupportedDataStreamType valueOfIgnoreCase(String s) {
     return valueOf(s.toUpperCase());
   }
 
+  private final String clientFactoryClassName;
+  private final String serverFactoryClassName;
+
+  SupportedDataStreamType(String clientFactoryClassName, String 
serverFactoryClassName) {
+    this.clientFactoryClassName = clientFactoryClassName;
+    this.serverFactoryClassName = serverFactoryClassName;
+  }
+
   SupportedDataStreamType(String factoryClassName) {
-    this.factoryClassName = factoryClassName;
+    this(factoryClassName, factoryClassName);
   }
 
   @Override
-  public SupportedDataStreamType getDataStreamType() {
-    return valueOf(this.factoryClassName.toUpperCase());
+  public DataStreamFactory newClientFactory(Parameters parameters) {
+    final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
+        clientFactoryClassName, DataStreamFactory.class);
+    return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
   }
 
-  public DataStreamFactory newFactory(Parameters parameters) {
+  @Override
+  public DataStreamFactory newServerFactory(Parameters parameters) {
     final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
-        factoryClassName, DataStreamFactory.class);
+        serverFactoryClassName, DataStreamFactory.class);
     return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java 
b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
index 79b08b6..2d2f03a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -55,7 +55,7 @@ public interface RpcType {
   /** @return the name of the rpc type. */
   String name();
 
-  /** @return a new factory created using the given properties and parameters. 
*/
+  /** @return a new factory created using the given parameters. */
   RpcFactory newFactory(Parameters parameters);
 
   /** An interface to get {@link RpcType}. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index edfaf76..63589b0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -17,22 +17,22 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.DataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.statemachine.StateMachine;
 
 /** A {@link DataStreamFactory} to create server-side objects. */
 public interface DataStreamServerFactory extends DataStreamFactory {
-
-  static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
+  static DataStreamServerFactory newInstance(DataStreamType type, Parameters 
parameters) {
+    final DataStreamFactory dataStreamFactory = 
type.newServerFactory(parameters);
     if (dataStreamFactory instanceof DataStreamServerFactory) {
       return (DataStreamServerFactory)dataStreamFactory;
     }
     throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
-        + " to " + ServerFactory.class
-        + "; rpc type is " + dataStreamFactory.getDataStreamType());
+        + " to " + DataStreamServerFactory.class + "; rpc type is " + type);
   }
 
   /** Create a new {@link DataStreamServerRpc}. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
similarity index 73%
rename from 
ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
rename to 
ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index e457916..a4762fa 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -15,35 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
 
-import org.apache.ratis.client.DataStreamClientFactory;
-import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.DataStreamServerFactory;
-import org.apache.ratis.server.DataStreamServerRpc;
-import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
 
 import java.util.Collection;
 
 /** A stream factory that does nothing when data stream is disabled. */
-public class DisabledDataStreamFactory implements DataStreamServerFactory, 
DataStreamClientFactory {
-  public DisabledDataStreamFactory(Parameters parameters) {}
-
-  @Override
-  public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, 
RaftProperties properties) {
-    return new DataStreamClientRpc() {
-      @Override
-      public void startClient() {}
-
-      @Override
-      public void closeClient() {}
-    };
-  }
+public class DisabledDataStreamServerFactory implements 
DataStreamServerFactory {
+  public DisabledDataStreamServerFactory(Parameters parameters) {}
 
   @Override
   public DataStreamServerRpc newDataStreamServerRpc(
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index c66829b..4ccbfc8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -41,7 +41,7 @@ public class DataStreamServerImpl implements DataStreamServer 
{
       RaftProperties properties, Parameters parameters){
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
 
-    this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+    this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
         .newDataStreamServerRpc(server, stateMachine, properties);
   }
 
@@ -49,7 +49,7 @@ public class DataStreamServerImpl implements DataStreamServer 
{
       RaftProperties properties, Parameters parameters){
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
 
-    this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+    this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
         .newDataStreamServerRpc(server, stateMachine, properties);
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 1ecded1..e22efb3 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.DisabledDataStreamClientFactory;
 import org.apache.ratis.conf.RaftProperties;
 import org.junit.Before;
 import org.junit.Rule;
@@ -41,7 +42,8 @@ public class TestDataStreamDisabled extends 
TestDataStreamBase {
       setupServer();
       setupClient();
       exception.expect(UnsupportedOperationException.class);
-      
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1
 does not support streamAsync");
+      exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
+          + "$1 does not support streamAsync");
       // stream() will create a header request, thus it will hit 
UnsupportedOperationException due to
       // DisabledDataStreamFactory.
       client.stream();

Reply via email to