This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c498bc1 RATIS-1108. Add SupportedDataStreamType.DISABLED (#234).
Contributed by Rui Wang
c498bc1 is described below
commit c498bc147ff0cd2f71bfe1158fa01f64c3d3f221
Author: Rui Wang <[email protected]>
AuthorDate: Thu Oct 29 03:12:09 2020 -0700
RATIS-1108. Add SupportedDataStreamType.DISABLED (#234). Contributed by
Rui Wang
I pushed a commit to fixes indentation and remove IOException.
---
.../ratis/datastream/SupportedDataStreamType.java | 1 +
.../server/impl/DisabledDataStreamFactory.java | 82 ++++++++++++++++++++++
...TestDataStream.java => TestDataStreamBase.java} | 45 +++++-------
.../ratis/datastream/TestDataStreamDisabled.java | 52 ++++++++++++++
.../ratis/datastream/TestDataStreamNetty.java | 45 ++++++++++++
5 files changed, 197 insertions(+), 28 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
index 01d5f89..ff9d3da 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;
public enum SupportedDataStreamType implements DataStreamFactory {
+ DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
NETTY("org.apache.ratis.netty.NettyDataStreamFactory");
private final String factoryClassName;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
new file mode 100644
index 0000000..e457916
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.client.DataStreamClientFactory;
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.util.Collection;
+
+/** A stream factory that does nothing when data stream is disabled. */
+public class DisabledDataStreamFactory implements DataStreamServerFactory,
DataStreamClientFactory {
+ public DisabledDataStreamFactory(Parameters parameters) {}
+
+ @Override
+ public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server,
RaftProperties properties) {
+ return new DataStreamClientRpc() {
+ @Override
+ public void startClient() {}
+
+ @Override
+ public void closeClient() {}
+ };
+ }
+
+ @Override
+ public DataStreamServerRpc newDataStreamServerRpc(
+ RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+ return new DataStreamServerRpc() {
+ @Override
+ public void start() {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void addRaftPeers(Collection<RaftPeer> peers) {}
+ };
+ }
+
+ @Override
+ public DataStreamServerRpc newDataStreamServerRpc(
+ RaftServer server, StateMachine stateMachine, RaftProperties properties)
{
+ return new DataStreamServerRpc() {
+ @Override
+ public void start() {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void addRaftPeers(Collection<RaftPeer> peers) {}
+ };
+ }
+
+ @Override
+ public SupportedDataStreamType getDataStreamType() {
+ return SupportedDataStreamType.DISABLED;
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
similarity index 90%
rename from
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
rename to
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
index af930f7..d14b737 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ratis.datastream;
-import java.io.IOException;
-import java.util.stream.Collectors;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.client.api.DataStreamOutput;
@@ -36,8 +33,8 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
-import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -45,8 +42,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
-public class TestDataStream extends BaseTest {
+class TestDataStreamBase extends BaseTest {
static final int MODULUS = 23;
static byte pos2byte(int pos) {
@@ -116,13 +114,14 @@ public class TestDataStream extends BaseTest {
}
}
- private List<RaftPeer> peers;
- private RaftProperties properties;
+ protected RaftProperties properties;
+ protected DataStreamClientImpl client;
+
private List<DataStreamServerImpl> servers;
- private DataStreamClientImpl client;
+ private List<RaftPeer> peers;
private List<SingleDataStreamStateMachine> singleDataStreamStateMachines;
- private void setupServer(){
+ protected void setupServer(){
servers = new ArrayList<>(peers.size());
singleDataStreamStateMachines = new ArrayList<>(peers.size());
// start stream servers on raft peers.
@@ -143,47 +142,37 @@ public class TestDataStream extends BaseTest {
}
}
- private void setupClient(){
+ protected void setupClient(){
client = new DataStreamClientImpl(peers.get(0), properties, null);
client.start();
}
- public void shutdown() throws IOException {
+ protected void shutdown() throws IOException {
client.close();
for (DataStreamServerImpl server : servers) {
server.close();
}
}
- @Test
- public void testDataStreamSingleServer() throws Exception {
- runTestDataStream(1, 1_000_000, 100);
- runTestDataStream(1,1_000, 10_000);
- }
-
- @Test
- public void testDataStreamMultipleServer() throws Exception {
- runTestDataStream(3, 1_000_000, 100);
- runTestDataStream(3, 1_000, 10_000);
- }
-
- void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws
Exception {
- properties = new RaftProperties();
+ protected void setupRaftPeers(int numServers) {
peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.collect(Collectors.toList());
+ }
- setupServer();
- setupClient();
+ protected void runTestDataStream(int numServers, int bufferSize, int
bufferNum) throws Exception {
+ setupRaftPeers(numServers);
try {
+ setupServer();
+ setupClient();
runTestDataStream(bufferSize, bufferNum);
} finally {
shutdown();
}
}
- void runTestDataStream(int bufferSize, int bufferNum) {
+ private void runTestDataStream(int bufferSize, int bufferNum) {
final DataStreamOutput out = client.stream();
DataStreamClientImpl.DataStreamOutputImpl impl =
(DataStreamClientImpl.DataStreamOutputImpl) out;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
new file mode 100644
index 0000000..1ecded1
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestDataStreamDisabled extends TestDataStreamBase {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ properties = new RaftProperties();
+ RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.DISABLED);
+ }
+
+ @Test
+ public void testDataStreamDisabled() throws Exception {
+ setupRaftPeers(1);
+ try {
+ setupServer();
+ setupClient();
+ exception.expect(UnsupportedOperationException.class);
+
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1
does not support streamAsync");
+ // stream() will create a header request, thus it will hit
UnsupportedOperationException due to
+ // DisabledDataStreamFactory.
+ client.stream();
+ } finally {
+ shutdown();
+ }
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
new file mode 100644
index 0000000..2725b01
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestDataStreamNetty extends TestDataStreamBase {
+ @Before
+ public void setup() {
+ properties = new RaftProperties();
+ RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.NETTY);
+ }
+
+ @Test
+ public void testDataStreamSingleServer() throws Exception {
+ runTestDataStream(1, 1_000_000, 100);
+ runTestDataStream(1,1_000, 10_000);
+ }
+
+ @Test
+ public void testDataStreamMultipleServer() throws Exception {
+ runTestDataStream(3, 1_000_000, 100);
+ runTestDataStream(3, 1_000, 10_000);
+ }
+}