This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new af35841 RATIS-1116. Add DataStreamType. (#238)
af35841 is described below
commit af358415cc8ebc70665d73e5fc4812b7cb669c75
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 29 21:12:04 2020 +0800
RATIS-1116. Add DataStreamType. (#238)
* RATIS-1116. Add DataStreamType.
* Fix a bug and change the cast(..) method to newInstance(..).
---
.../ratis/client/DataStreamClientFactory.java | 9 ++---
...y.java => DisabledDataStreamClientFactory.java} | 32 +++++++++--------
.../ratis/client/impl/DataStreamClientImpl.java | 2 +-
.../apache/ratis/datastream/DataStreamFactory.java | 4 +--
.../DataStreamType.java} | 42 +++++++++++-----------
.../ratis/datastream/SupportedDataStreamType.java | 28 ++++++++++-----
.../main/java/org/apache/ratis/rpc/RpcType.java | 4 +--
.../ratis/server/DataStreamServerFactory.java | 10 +++---
...y.java => DisabledDataStreamServerFactory.java} | 22 ++----------
.../ratis/server/impl/DataStreamServerImpl.java | 4 +--
.../ratis/datastream/TestDataStreamDisabled.java | 4 ++-
11 files changed, 81 insertions(+), 80 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
index c77fb7a..1e1ea3b 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
@@ -18,22 +18,23 @@
package org.apache.ratis.client;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.DataStreamType;
import org.apache.ratis.protocol.RaftPeer;
/**
* A factory to create streaming client.
*/
public interface DataStreamClientFactory extends DataStreamFactory {
-
- static DataStreamClientFactory cast(DataStreamFactory dataStreamFactory) {
+ static DataStreamClientFactory newInstance(DataStreamType type, Parameters
parameters) {
+ final DataStreamFactory dataStreamFactory =
type.newClientFactory(parameters);
if (dataStreamFactory instanceof DataStreamClientFactory) {
return (DataStreamClientFactory) dataStreamFactory;
}
throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
- + " to " + ClientFactory.class
- + "; stream type is " + dataStreamFactory.getDataStreamType());
+ + " to " + DataStreamClientFactory.class + "; stream type is " + type);
}
DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties
properties);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
similarity index 57%
copy from
ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
copy to
ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
index c77fb7a..1347e7b 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
@@ -15,26 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.client;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
-/**
- * A factory to create streaming client.
- */
-public interface DataStreamClientFactory extends DataStreamFactory {
+/** A stream factory that does nothing when data stream is disabled. */
+public class DisabledDataStreamClientFactory implements
DataStreamClientFactory {
+ public DisabledDataStreamClientFactory(Parameters parameters) {}
- static DataStreamClientFactory cast(DataStreamFactory dataStreamFactory) {
- if (dataStreamFactory instanceof DataStreamClientFactory) {
- return (DataStreamClientFactory) dataStreamFactory;
- }
- throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
- + " to " + ClientFactory.class
- + "; stream type is " + dataStreamFactory.getDataStreamType());
+ @Override
+ public SupportedDataStreamType getDataStreamType() {
+ return SupportedDataStreamType.DISABLED;
}
- DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties
properties);
+ @Override
+ public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server,
RaftProperties properties) {
+ return new DataStreamClientRpc() {
+ @Override
+ public void startClient() {}
+
+ @Override
+ public void closeClient() {}
+ };
+ }
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 51e9eb6..9316c62 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -60,7 +60,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
this.raftServer = Objects.requireNonNull(server, "server == null");
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
- this.dataStreamClientRpc =
DataStreamClientFactory.cast(type.newFactory(parameters))
+ this.dataStreamClientRpc = DataStreamClientFactory.newInstance(type,
parameters)
.newDataStreamClientRpc(raftServer, properties);
this.orderedStreamAsync = new OrderedStreamAsync(clientId,
dataStreamClientRpc, properties);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
index 6d88e7c..1e4874a 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamFactory.java
@@ -15,9 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.datastream;
-public interface DataStreamFactory {
- SupportedDataStreamType getDataStreamType();
+public interface DataStreamFactory extends DataStreamType.Get {
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
similarity index 51%
copy from ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
copy to
ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
index 79b08b6..f157fa0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamType.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,37 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.rpc;
+package org.apache.ratis.datastream;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;
-/** The type of RPC implementations. */
-public interface RpcType {
+/** The type of data stream implementations. */
+public interface DataStreamType {
/**
- * Parse the given string as a {@link SupportedRpcType}
- * or a user-defined {@link RpcType}.
+ * Parse the given string as a {@link SupportedDataStreamType}
+ * or a user-defined {@link DataStreamType}.
*
- * @param rpcType The string representation of an {@link RpcType}.
- * @return a {@link SupportedRpcType} or a user-defined {@link RpcType}.
+ * @param dataStreamType The string representation of an {@link
DataStreamType}.
+ * @return a {@link SupportedDataStreamType} or a user-defined {@link
DataStreamType}.
*/
- static RpcType valueOf(String rpcType) {
+ static DataStreamType valueOf(String dataStreamType) {
final Throwable fromSupportedRpcType;
try { // Try parsing it as a SupportedRpcType
- return SupportedRpcType.valueOfIgnoreCase(rpcType);
+ return SupportedDataStreamType.valueOfIgnoreCase(dataStreamType);
} catch (Throwable t) {
fromSupportedRpcType = t;
}
try {
// Try using it as a class name
- return ReflectionUtils.newInstance(
- ReflectionUtils.getClass(rpcType, RpcType.class));
+ return
ReflectionUtils.newInstance(ReflectionUtils.getClass(dataStreamType,
DataStreamType.class));
} catch(Throwable t) {
final IllegalArgumentException iae = new IllegalArgumentException(
- "Invalid " + RpcType.class.getSimpleName() + ": \"" + rpcType + "\" "
- + " cannot be used as a user-defined " +
RpcType.class.getSimpleName()
- + " and it is not a " + SupportedRpcType.class.getSimpleName() +
".");
+ "Invalid " + DataStreamType.class.getSimpleName() + ": \"" +
dataStreamType + "\" "
+ + " cannot be used as a user-defined " +
DataStreamType.class.getSimpleName()
+ + " and it is not a " +
SupportedDataStreamType.class.getSimpleName() + ".");
iae.addSuppressed(t);
iae.addSuppressed(fromSupportedRpcType);
throw iae;
@@ -55,12 +54,15 @@ public interface RpcType {
/** @return the name of the rpc type. */
String name();
- /** @return a new factory created using the given properties and parameters.
*/
- RpcFactory newFactory(Parameters parameters);
+ /** @return a new client factory created using the given parameters. */
+ DataStreamFactory newClientFactory(Parameters parameters);
- /** An interface to get {@link RpcType}. */
+ /** @return a new server factory created using the given parameters. */
+ DataStreamFactory newServerFactory(Parameters parameters);
+
+ /** An interface to get {@link DataStreamType}. */
interface Get {
- /** @return the {@link RpcType}. */
- RpcType getRpcType();
+ /** @return the {@link DataStreamType}. */
+ DataStreamType getDataStreamType();
}
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
index ff9d3da..019093e 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
@@ -20,30 +20,40 @@ package org.apache.ratis.datastream;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;
-public enum SupportedDataStreamType implements DataStreamFactory {
- DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
+public enum SupportedDataStreamType implements DataStreamType {
+ DISABLED("org.apache.ratis.client.DisabledDataStreamClientFactory",
+ "org.apache.ratis.server.DisabledDataStreamServerFactory"),
NETTY("org.apache.ratis.netty.NettyDataStreamFactory");
- private final String factoryClassName;
-
private static final Class<?>[] ARG_CLASSES = {Parameters.class};
public static SupportedDataStreamType valueOfIgnoreCase(String s) {
return valueOf(s.toUpperCase());
}
+ private final String clientFactoryClassName;
+ private final String serverFactoryClassName;
+
+ SupportedDataStreamType(String clientFactoryClassName, String
serverFactoryClassName) {
+ this.clientFactoryClassName = clientFactoryClassName;
+ this.serverFactoryClassName = serverFactoryClassName;
+ }
+
SupportedDataStreamType(String factoryClassName) {
- this.factoryClassName = factoryClassName;
+ this(factoryClassName, factoryClassName);
}
@Override
- public SupportedDataStreamType getDataStreamType() {
- return valueOf(this.factoryClassName.toUpperCase());
+ public DataStreamFactory newClientFactory(Parameters parameters) {
+ final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
+ clientFactoryClassName, DataStreamFactory.class);
+ return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}
- public DataStreamFactory newFactory(Parameters parameters) {
+ @Override
+ public DataStreamFactory newServerFactory(Parameters parameters) {
final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
- factoryClassName, DataStreamFactory.class);
+ serverFactoryClassName, DataStreamFactory.class);
return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
index 79b08b6..2d2f03a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -55,7 +55,7 @@ public interface RpcType {
/** @return the name of the rpc type. */
String name();
- /** @return a new factory created using the given properties and parameters.
*/
+ /** @return a new factory created using the given parameters. */
RpcFactory newFactory(Parameters parameters);
/** An interface to get {@link RpcType}. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index edfaf76..63589b0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -17,22 +17,22 @@
*/
package org.apache.ratis.server;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.datastream.DataStreamType;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.statemachine.StateMachine;
/** A {@link DataStreamFactory} to create server-side objects. */
public interface DataStreamServerFactory extends DataStreamFactory {
-
- static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
+ static DataStreamServerFactory newInstance(DataStreamType type, Parameters
parameters) {
+ final DataStreamFactory dataStreamFactory =
type.newServerFactory(parameters);
if (dataStreamFactory instanceof DataStreamServerFactory) {
return (DataStreamServerFactory)dataStreamFactory;
}
throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
- + " to " + ServerFactory.class
- + "; rpc type is " + dataStreamFactory.getDataStreamType());
+ + " to " + DataStreamServerFactory.class + "; rpc type is " + type);
}
/** Create a new {@link DataStreamServerRpc}. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
similarity index 73%
rename from
ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
rename to
ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index e457916..a4762fa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -15,35 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
-import org.apache.ratis.client.DataStreamClientFactory;
-import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.server.DataStreamServerFactory;
-import org.apache.ratis.server.DataStreamServerRpc;
-import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import java.util.Collection;
/** A stream factory that does nothing when data stream is disabled. */
-public class DisabledDataStreamFactory implements DataStreamServerFactory,
DataStreamClientFactory {
- public DisabledDataStreamFactory(Parameters parameters) {}
-
- @Override
- public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server,
RaftProperties properties) {
- return new DataStreamClientRpc() {
- @Override
- public void startClient() {}
-
- @Override
- public void closeClient() {}
- };
- }
+public class DisabledDataStreamServerFactory implements
DataStreamServerFactory {
+ public DisabledDataStreamServerFactory(Parameters parameters) {}
@Override
public DataStreamServerRpc newDataStreamServerRpc(
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index c66829b..4ccbfc8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -41,7 +41,7 @@ public class DataStreamServerImpl implements DataStreamServer
{
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
- this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+ this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
.newDataStreamServerRpc(server, stateMachine, properties);
}
@@ -49,7 +49,7 @@ public class DataStreamServerImpl implements DataStreamServer
{
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
- this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+ this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
.newDataStreamServerRpc(server, stateMachine, properties);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 1ecded1..e22efb3 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.DisabledDataStreamClientFactory;
import org.apache.ratis.conf.RaftProperties;
import org.junit.Before;
import org.junit.Rule;
@@ -41,7 +42,8 @@ public class TestDataStreamDisabled extends
TestDataStreamBase {
setupServer();
setupClient();
exception.expect(UnsupportedOperationException.class);
-
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1
does not support streamAsync");
+ exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
+ + "$1 does not support streamAsync");
// stream() will create a header request, thus it will hit
UnsupportedOperationException due to
// DisabledDataStreamFactory.
client.stream();