This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c498bc1  RATIS-1108. Add SupportedDataStreamType.DISABLED (#234).  
Contributed by Rui Wang
c498bc1 is described below

commit c498bc147ff0cd2f71bfe1158fa01f64c3d3f221
Author: Rui Wang <[email protected]>
AuthorDate: Thu Oct 29 03:12:09 2020 -0700

    RATIS-1108. Add SupportedDataStreamType.DISABLED (#234).  Contributed by 
Rui Wang
    
    I pushed a commit to fixes indentation and remove IOException.
---
 .../ratis/datastream/SupportedDataStreamType.java  |  1 +
 .../server/impl/DisabledDataStreamFactory.java     | 82 ++++++++++++++++++++++
 ...TestDataStream.java => TestDataStreamBase.java} | 45 +++++-------
 .../ratis/datastream/TestDataStreamDisabled.java   | 52 ++++++++++++++
 .../ratis/datastream/TestDataStreamNetty.java      | 45 ++++++++++++
 5 files changed, 197 insertions(+), 28 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
index 01d5f89..ff9d3da 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.util.ReflectionUtils;
 
 public enum SupportedDataStreamType implements DataStreamFactory {
+  DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
   NETTY("org.apache.ratis.netty.NettyDataStreamFactory");
 
   private final String factoryClassName;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
new file mode 100644
index 0000000..e457916
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.client.DataStreamClientFactory;
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.util.Collection;
+
+/** A stream factory that does nothing when data stream is disabled. */
+public class DisabledDataStreamFactory implements DataStreamServerFactory, 
DataStreamClientFactory {
+  public DisabledDataStreamFactory(Parameters parameters) {}
+
+  @Override
+  public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, 
RaftProperties properties) {
+    return new DataStreamClientRpc() {
+      @Override
+      public void startClient() {}
+
+      @Override
+      public void closeClient() {}
+    };
+  }
+
+  @Override
+  public DataStreamServerRpc newDataStreamServerRpc(
+      RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    return new DataStreamServerRpc() {
+      @Override
+      public void start() {}
+
+      @Override
+      public void close() {}
+
+      @Override
+      public void addRaftPeers(Collection<RaftPeer> peers) {}
+    };
+  }
+
+  @Override
+  public DataStreamServerRpc newDataStreamServerRpc(
+      RaftServer server, StateMachine stateMachine, RaftProperties properties) 
{
+    return new DataStreamServerRpc() {
+      @Override
+      public void start() {}
+
+      @Override
+      public void close() {}
+
+      @Override
+      public void addRaftPeers(Collection<RaftPeer> peers) {}
+    };
+  }
+
+  @Override
+  public SupportedDataStreamType getDataStreamType() {
+    return SupportedDataStreamType.DISABLED;
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
similarity index 90%
rename from 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
rename to 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
index af930f7..d14b737 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
@@ -15,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.datastream;
 
-import java.io.IOException;
-import java.util.stream.Collectors;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.client.api.DataStreamOutput;
@@ -36,8 +33,8 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
-import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -45,8 +42,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
-public class TestDataStream extends BaseTest {
+class TestDataStreamBase extends BaseTest {
   static final int MODULUS = 23;
 
   static byte pos2byte(int pos) {
@@ -116,13 +114,14 @@ public class TestDataStream extends BaseTest {
     }
   }
 
-  private List<RaftPeer> peers;
-  private RaftProperties properties;
+  protected RaftProperties properties;
+  protected DataStreamClientImpl client;
+
   private List<DataStreamServerImpl> servers;
-  private DataStreamClientImpl client;
+  private List<RaftPeer> peers;
   private List<SingleDataStreamStateMachine> singleDataStreamStateMachines;
 
-  private void setupServer(){
+  protected void setupServer(){
     servers = new ArrayList<>(peers.size());
     singleDataStreamStateMachines = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
@@ -143,47 +142,37 @@ public class TestDataStream extends BaseTest {
     }
   }
 
-  private void setupClient(){
+  protected void setupClient(){
     client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
-  public void shutdown() throws IOException {
+  protected void shutdown() throws IOException {
     client.close();
     for (DataStreamServerImpl server : servers) {
       server.close();
     }
   }
 
-  @Test
-  public void testDataStreamSingleServer() throws Exception {
-    runTestDataStream(1, 1_000_000, 100);
-    runTestDataStream(1,1_000, 10_000);
-  }
-
-  @Test
-  public void testDataStreamMultipleServer() throws Exception {
-    runTestDataStream(3, 1_000_000, 100);
-    runTestDataStream(3, 1_000, 10_000);
-  }
-
-  void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws 
Exception {
-    properties = new RaftProperties();
+  protected void setupRaftPeers(int numServers) {
     peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
         .map(RaftPeerId::valueOf)
         .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
         .collect(Collectors.toList());
+  }
 
-    setupServer();
-    setupClient();
+  protected void runTestDataStream(int numServers, int bufferSize, int 
bufferNum) throws Exception {
+    setupRaftPeers(numServers);
     try {
+      setupServer();
+      setupClient();
       runTestDataStream(bufferSize, bufferNum);
     } finally {
       shutdown();
     }
   }
 
-  void runTestDataStream(int bufferSize, int bufferNum) {
+  private void runTestDataStream(int bufferSize, int bufferNum) {
     final DataStreamOutput out = client.stream();
     DataStreamClientImpl.DataStreamOutputImpl impl = 
(DataStreamClientImpl.DataStreamOutputImpl) out;
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
new file mode 100644
index 0000000..1ecded1
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestDataStreamDisabled extends TestDataStreamBase {
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Before
+  public void setup() {
+    properties = new RaftProperties();
+    RaftConfigKeys.DataStream.setType(properties, 
SupportedDataStreamType.DISABLED);
+  }
+
+  @Test
+  public void testDataStreamDisabled() throws Exception {
+    setupRaftPeers(1);
+    try {
+      setupServer();
+      setupClient();
+      exception.expect(UnsupportedOperationException.class);
+      
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1
 does not support streamAsync");
+      // stream() will create a header request, thus it will hit 
UnsupportedOperationException due to
+      // DisabledDataStreamFactory.
+      client.stream();
+    } finally {
+      shutdown();
+    }
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
new file mode 100644
index 0000000..2725b01
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestDataStreamNetty extends TestDataStreamBase {
+  @Before
+  public void setup() {
+    properties = new RaftProperties();
+    RaftConfigKeys.DataStream.setType(properties, 
SupportedDataStreamType.NETTY);
+  }
+
+  @Test
+  public void testDataStreamSingleServer() throws Exception {
+    runTestDataStream(1, 1_000_000, 100);
+    runTestDataStream(1,1_000, 10_000);
+  }
+
+  @Test
+  public void testDataStreamMultipleServer() throws Exception {
+    runTestDataStream(3, 1_000_000, 100);
+    runTestDataStream(3, 1_000, 10_000);
+  }
+}

Reply via email to