This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch revert-234-RATIS-1108 in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit a5eceb6bd8d1b59d2d219eb50b6a4e2a2a44f868 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Oct 29 18:14:22 2020 +0800 Revert "RATIS-1108. Add SupportedDataStreamType.DISABLED (#234). Contributed by Rui Wang" This reverts commit c498bc147ff0cd2f71bfe1158fa01f64c3d3f221. --- .../ratis/datastream/SupportedDataStreamType.java | 1 - .../server/impl/DisabledDataStreamFactory.java | 82 ---------------------- ...TestDataStreamBase.java => TestDataStream.java} | 45 +++++++----- .../ratis/datastream/TestDataStreamDisabled.java | 52 -------------- .../ratis/datastream/TestDataStreamNetty.java | 45 ------------ 5 files changed, 28 insertions(+), 197 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java index ff9d3da..01d5f89 100644 --- a/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java +++ b/ratis-common/src/main/java/org/apache/ratis/datastream/SupportedDataStreamType.java @@ -21,7 +21,6 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.util.ReflectionUtils; public enum SupportedDataStreamType implements DataStreamFactory { - DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"), NETTY("org.apache.ratis.netty.NettyDataStreamFactory"); private final String factoryClassName; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java deleted file mode 100644 index e457916..0000000 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DisabledDataStreamFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.impl; - -import org.apache.ratis.client.DataStreamClientFactory; -import org.apache.ratis.client.DataStreamClientRpc; -import org.apache.ratis.conf.Parameters; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.datastream.SupportedDataStreamType; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.server.DataStreamServerFactory; -import org.apache.ratis.server.DataStreamServerRpc; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.statemachine.StateMachine; - -import java.util.Collection; - -/** A stream factory that does nothing when data stream is disabled. */ -public class DisabledDataStreamFactory implements DataStreamServerFactory, DataStreamClientFactory { - public DisabledDataStreamFactory(Parameters parameters) {} - - @Override - public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties) { - return new DataStreamClientRpc() { - @Override - public void startClient() {} - - @Override - public void closeClient() {} - }; - } - - @Override - public DataStreamServerRpc newDataStreamServerRpc( - RaftPeer server, StateMachine stateMachine, RaftProperties properties) { - return new DataStreamServerRpc() { - @Override - public void start() {} - - @Override - public void close() {} - - @Override - public void addRaftPeers(Collection<RaftPeer> peers) {} - }; - } - - @Override - public DataStreamServerRpc newDataStreamServerRpc( - RaftServer server, StateMachine stateMachine, RaftProperties properties) { - return new DataStreamServerRpc() { - @Override - public void start() {} - - @Override - public void close() {} - - @Override - public void addRaftPeers(Collection<RaftPeer> peers) {} - }; - } - - @Override - public SupportedDataStreamType getDataStreamType() { - return SupportedDataStreamType.DISABLED; - } -} diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java similarity index 90% rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java index d14b737..af930f7 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java @@ -15,8 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ratis.datastream; +import java.io.IOException; +import java.util.stream.Collectors; import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.client.api.DataStreamOutput; @@ -33,8 +36,8 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.NetUtils; import org.junit.Assert; +import org.junit.Test; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -42,9 +45,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -class TestDataStreamBase extends BaseTest { +public class TestDataStream extends BaseTest { static final int MODULUS = 23; static byte pos2byte(int pos) { @@ -114,14 +116,13 @@ class TestDataStreamBase extends BaseTest { } } - protected RaftProperties properties; - protected DataStreamClientImpl client; - - private List<DataStreamServerImpl> servers; private List<RaftPeer> peers; + private RaftProperties properties; + private List<DataStreamServerImpl> servers; + private DataStreamClientImpl client; private List<SingleDataStreamStateMachine> singleDataStreamStateMachines; - protected void setupServer(){ + private void setupServer(){ servers = new ArrayList<>(peers.size()); singleDataStreamStateMachines = new ArrayList<>(peers.size()); // start stream servers on raft peers. @@ -142,37 +143,47 @@ class TestDataStreamBase extends BaseTest { } } - protected void setupClient(){ + private void setupClient(){ client = new DataStreamClientImpl(peers.get(0), properties, null); client.start(); } - protected void shutdown() throws IOException { + public void shutdown() throws IOException { client.close(); for (DataStreamServerImpl server : servers) { server.close(); } } - protected void setupRaftPeers(int numServers) { + @Test + public void testDataStreamSingleServer() throws Exception { + runTestDataStream(1, 1_000_000, 100); + runTestDataStream(1,1_000, 10_000); + } + + @Test + public void testDataStreamMultipleServer() throws Exception { + runTestDataStream(3, 1_000_000, 100); + runTestDataStream(3, 1_000, 10_000); + } + + void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws Exception { + properties = new RaftProperties(); peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0)) .map(RaftPeerId::valueOf) .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())) .collect(Collectors.toList()); - } - protected void runTestDataStream(int numServers, int bufferSize, int bufferNum) throws Exception { - setupRaftPeers(numServers); + setupServer(); + setupClient(); try { - setupServer(); - setupClient(); runTestDataStream(bufferSize, bufferNum); } finally { shutdown(); } } - private void runTestDataStream(int bufferSize, int bufferNum) { + void runTestDataStream(int bufferSize, int bufferNum) { final DataStreamOutput out = client.stream(); DataStreamClientImpl.DataStreamOutputImpl impl = (DataStreamClientImpl.DataStreamOutputImpl) out; diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java deleted file mode 100644 index 1ecded1..0000000 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.datastream; - -import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.conf.RaftProperties; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -public class TestDataStreamDisabled extends TestDataStreamBase { - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @Before - public void setup() { - properties = new RaftProperties(); - RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.DISABLED); - } - - @Test - public void testDataStreamDisabled() throws Exception { - setupRaftPeers(1); - try { - setupServer(); - setupClient(); - exception.expect(UnsupportedOperationException.class); - exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1 does not support streamAsync"); - // stream() will create a header request, thus it will hit UnsupportedOperationException due to - // DisabledDataStreamFactory. - client.stream(); - } finally { - shutdown(); - } - } -} diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java deleted file mode 100644 index 2725b01..0000000 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ratis.datastream; - -import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.conf.RaftProperties; -import org.junit.Before; -import org.junit.Test; - - -public class TestDataStreamNetty extends TestDataStreamBase { - @Before - public void setup() { - properties = new RaftProperties(); - RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); - } - - @Test - public void testDataStreamSingleServer() throws Exception { - runTestDataStream(1, 1_000_000, 100); - runTestDataStream(1,1_000, 10_000); - } - - @Test - public void testDataStreamMultipleServer() throws Exception { - runTestDataStream(3, 1_000_000, 100); - runTestDataStream(3, 1_000, 10_000); - } -}
