This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0f7957048cc HDDS-14490. HadoopRpcOMFollowerProxyProvider should
explicitly use OzoneManagerProtocolPB (#9670)
0f7957048cc is described below
commit 0f7957048cc6c5732b722336959c09b900a1148f
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Feb 1 03:39:35 2026 +0800
HDDS-14490. HadoopRpcOMFollowerProxyProvider should explicitly use
OzoneManagerProtocolPB (#9670)
---
...doopRpcOMFollowerReadFailoverProxyProvider.java | 71 +++++++---------------
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 9 ++-
.../om/protocolPB/OzoneManagerProtocolPB.java | 2 +-
...doopRpcOMFollowerReadFailoverProxyProvider.java | 18 +-----
.../org/apache/hadoop/ozone/om/OmTestUtil.java | 2 +-
...stOzoneManagerHAFollowerReadWithAllRunning.java | 8 +--
...OzoneManagerHAFollowerReadWithStoppedNodes.java | 10 +--
.../ozone/om/TestOzoneManagerHAWithAllRunning.java | 2 +-
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 7 +--
9 files changed, 43 insertions(+), 86 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
index fa847d2cf85..bc2f9a246d8 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -29,7 +29,6 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc_.Client.ConnectionId;
@@ -41,7 +40,6 @@
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,17 +60,14 @@
* Read and write requests will still be sent to leader OM if reading from
* follower is disabled.
*/
-public class HadoopRpcOMFollowerReadFailoverProxyProvider<T> implements
FailoverProxyProvider<T> {
- @VisibleForTesting
- public static final Logger LOG =
LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
-
- private final Class<T> protocolClass;
+public class HadoopRpcOMFollowerReadFailoverProxyProvider implements
FailoverProxyProvider<OzoneManagerProtocolPB> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
/** The inner proxy provider used for leader-based failover. */
- private final HadoopRpcOMFailoverProxyProvider<T> failoverProxy;
+ private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
failoverProxy;
/** The combined proxy which redirects to other proxies as necessary. */
- private final ProxyInfo<T> combinedProxy;
+ private final ProxyInfo<OzoneManagerProtocolPB> combinedProxy;
/**
* Whether reading from follower is enabled. If this is false, all read
@@ -87,52 +82,35 @@ public class
HadoopRpcOMFollowerReadFailoverProxyProvider<T> implements Failover
private int currentIndex = -1;
/** The last proxy that has been used. Only used for testing. */
- private volatile OMProxyInfo<T> lastProxy = null;
+ private volatile OMProxyInfo<OzoneManagerProtocolPB> lastProxy = null;
public HadoopRpcOMFollowerReadFailoverProxyProvider(
- ConfigurationSource configuration, UserGroupInformation ugi, String
omServiceId, Class<T> protocol)
- throws IOException {
- this(omServiceId, protocol,
- new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi,
omServiceId, protocol));
- }
-
- public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId,
Class<T> protocol,
- HadoopRpcOMFailoverProxyProvider<T> failoverProxy) {
- this.protocolClass = protocol;
+ HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> failoverProxy) {
this.failoverProxy = failoverProxy;
-
// Create a wrapped proxy containing all the proxies. Since this combined
// proxy is just redirecting to other proxies, all invocations can share
it.
final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
.map(a -> a.proxyInfo)
.reduce((a, b) -> a + ", " + b).orElse("") + "]";
- @SuppressWarnings("unchecked")
- T wrappedProxy = (T) Proxy.newProxyInstance(
+ OzoneManagerProtocolPB wrappedProxy = (OzoneManagerProtocolPB)
Proxy.newProxyInstance(
FollowerReadInvocationHandler.class.getClassLoader(),
- new Class<?>[] {protocol}, new FollowerReadInvocationHandler());
+ new Class<?>[] {OzoneManagerProtocolPB.class}, new
FollowerReadInvocationHandler());
combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo);
-
- if (wrappedProxy instanceof OzoneManagerProtocolPB) {
- this.useFollowerRead = true;
- } else {
- LOG.debug("Disabling follower reads for {} because the requested proxy "
- + "class does not implement {}", omServiceId,
OzoneManagerProtocolPB.class.getName());
- this.useFollowerRead = false;
- }
+ this.useFollowerRead = true;
}
@Override
- public Class<T> getInterface() {
- return protocolClass;
+ public Class<OzoneManagerProtocolPB> getInterface() {
+ return OzoneManagerProtocolPB.class;
}
@Override
- public ProxyInfo<T> getProxy() {
+ public ProxyInfo<OzoneManagerProtocolPB> getProxy() {
return combinedProxy;
}
@Override
- public void performFailover(T currProxy) {
+ public void performFailover(OzoneManagerProtocolPB currProxy) {
// Since FollowerReadInvocationHandler might user or fallback to
leader-based failover logic,
// we should delegate the failover logic to the leader's failover.
failoverProxy.performFailover(currProxy);
@@ -174,12 +152,7 @@ private static OMRequest parseOMRequest(Object[] args)
throws ServiceException {
}
@VisibleForTesting
- void setUseFollowerRead(boolean flag) {
- this.useFollowerRead = flag;
- }
-
- @VisibleForTesting
- public ProxyInfo<T> getLastProxy() {
+ public ProxyInfo<OzoneManagerProtocolPB> getLastProxy() {
return lastProxy;
}
@@ -188,7 +161,7 @@ public ProxyInfo<T> getLastProxy() {
* {@link #changeProxy(OMProxyInfo)} to initialize one.
*/
@VisibleForTesting
- public OMProxyInfo<T> getCurrentProxy() {
+ public OMProxyInfo<OzoneManagerProtocolPB> getCurrentProxy() {
return changeProxy(null);
}
@@ -202,13 +175,13 @@ public OMProxyInfo<T> getCurrentProxy() {
* @param initial The expected current proxy
* @return The new proxy that should be used.
*/
- private synchronized OMProxyInfo<T> changeProxy(OMProxyInfo<T> initial) {
- OMProxyInfo<T> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
+ private synchronized OMProxyInfo<OzoneManagerProtocolPB>
changeProxy(OMProxyInfo<OzoneManagerProtocolPB> initial) {
+ OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
if (currentProxy != initial) {
// Must have been a concurrent modification; ignore the move request
return currentProxy;
}
- final OMProxyInfo.OrderedMap<T> omProxies = failoverProxy.getOMProxyMap();
+ final OMProxyInfo.OrderedMap<OzoneManagerProtocolPB> omProxies =
failoverProxy.getOMProxyMap();
currentIndex = (currentIndex + 1) % omProxies.size();
final String currentOmNodeId = omProxies.getNodeId(currentIndex);
currentProxy = failoverProxy.createOMProxyIfNeeded(currentOmNodeId);
@@ -243,7 +216,7 @@ public Object invoke(Object proxy, final Method method,
final Object[] args)
if (useFollowerRead && OmUtils.shouldSendToFollower(omRequest)) {
int failedCount = 0;
for (int i = 0; useFollowerRead && i <
failoverProxy.getOMProxyMap().size(); i++) {
- OMProxyInfo<T> current = getCurrentProxy();
+ OMProxyInfo<OzoneManagerProtocolPB> current = getCurrentProxy();
LOG.debug("Attempting to service {} with cmdType {} using proxy {}",
method.getName(), omRequest.getCmdType(), current.proxyInfo);
try {
@@ -335,7 +308,7 @@ public Object invoke(Object proxy, final Method method,
final Object[] args)
// or this is a write request. In any case, forward the request to
// the leader OM.
LOG.debug("Using leader-based failoverProxy to service {}",
method.getName());
- final OMProxyInfo<T> leaderProxy = failoverProxy.getProxy();
+ final OMProxyInfo<OzoneManagerProtocolPB> leaderProxy =
failoverProxy.getProxy();
Object retVal = null;
try {
retVal = method.invoke(leaderProxy.getProxy(), args);
@@ -374,12 +347,12 @@ public boolean isUseFollowerRead() {
}
@VisibleForTesting
- public List<OMProxyInfo<T>> getOMProxies() {
+ public List<OMProxyInfo<OzoneManagerProtocolPB>> getOMProxies() {
return failoverProxy.getOMProxies();
}
public synchronized void changeInitialProxyForTest(String initialOmNodeId) {
- final OMProxyInfo<T> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
+ final OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
if (currentProxy != null &&
currentProxy.getNodeId().equals(initialOmNodeId)) {
return;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 9614e403f10..560beaec4b1 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -46,7 +46,7 @@ public class Hadoop3OmTransport implements OmTransport {
private static final RpcController NULL_RPC_CONTROLLER = null;
private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
- private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider;
+ private HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
@@ -74,9 +74,8 @@ public Hadoop3OmTransport(ConfigurationSource conf,
// So instead of enabling using follower read configuration, we can
simply let user to configure the
// failover proxy provider instead (similar to
dfs.client.failover.proxy.provider.<nameservice>)
if (followerReadEnabled) {
- this.followerReadFailoverProxyProvider = new
HadoopRpcOMFollowerReadFailoverProxyProvider<>(
- omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
- );
+ this.followerReadFailoverProxyProvider =
+ new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider);
this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
} else {
// TODO: It should be possible to simply instantiate
HadoopRpcOMFollowerReadFailoverProxyProvider
@@ -112,7 +111,7 @@ public
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> getOmFailoverPro
}
@VisibleForTesting
- public HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
+ public HadoopRpcOMFollowerReadFailoverProxyProvider
getOmFollowerReadFailoverProxyProvider() {
return followerReadFailoverProxyProvider;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
index 16f663ec574..6aa9c4da594 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
@@ -46,7 +46,7 @@ static OzoneManagerProtocolPB
newProxy(OMFailoverProxyProviderBase<OzoneManagerP
failoverProxyProvider.getRetryPolicy(maxFailovers));
}
- static OzoneManagerProtocolPB
newProxy(HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
+ static OzoneManagerProtocolPB
newProxy(HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider, int maxFailovers) {
return (OzoneManagerProtocolPB)
RetryProxy.create(OzoneManagerProtocolPB.class,
followerReadFailoverProxyProvider,
followerReadFailoverProxyProvider.getRetryPolicy(maxFailovers));
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
index 2494cf2f642..3587ca5dacb 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -58,7 +58,6 @@
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetKeyInfoRequest;
@@ -80,23 +79,12 @@ public class
TestHadoopRpcOMFollowerReadFailoverProxyProvider {
private static final long SLOW_RESPONSE_SLEEP_TIME =
TimeUnit.SECONDS.toMillis(2);
private static final String OM_SERVICE_ID = "om-service-test1";
private static final String NODE_ID_BASE_STR = "omNode-";
- private OzoneConfiguration conf;
- private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
proxyProvider;
+ private HadoopRpcOMFollowerReadFailoverProxyProvider proxyProvider;
private OzoneManagerProtocolPB retryProxy;
private String[] omNodeIds;
private OMAnswer[] omNodeAnswers;
- @Test
- public void testWithNonClientProxy() throws Exception {
- setupProxyProvider(2);
- HadoopRpcOMFollowerReadFailoverProxyProvider<OMAdminProtocolPB>
adminProxyProvider =
- new HadoopRpcOMFollowerReadFailoverProxyProvider<>(conf,
- UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
OMAdminProtocolPB.class);
- // follower read is only enabled for OzoneManagerProtocolPB and disabled
otherwise
- assertFalse(adminProxyProvider.isUseFollowerRead());
- }
-
@Test
void testWriteOperationOnLeader() throws Exception {
setupProxyProvider(3);
@@ -429,8 +417,7 @@ protected List<OMProxyInfo<OzoneManagerProtocolPB>>
initOmProxiesFromConfigs(
};
// Wrap the leader-based failover proxy provider with follower read proxy
provider
- proxyProvider = new HadoopRpcOMFollowerReadFailoverProxyProvider<>(
- OM_SERVICE_ID, OzoneManagerProtocolPB.class, underlyingProxyProvider);
+ proxyProvider = new
HadoopRpcOMFollowerReadFailoverProxyProvider(underlyingProxyProvider);
assertTrue(proxyProvider.isUseFollowerRead());
// Wrap the follower read proxy provider in retry proxy to allow automatic
failover
retryProxy = (OzoneManagerProtocolPB) RetryProxy.create(
@@ -440,7 +427,6 @@ protected List<OMProxyInfo<OzoneManagerProtocolPB>>
initOmProxiesFromConfigs(
// This is currently added to prevent IllegalStateException in
// Client#setCallIdAndRetryCount since it seems that callId is set but not
unset properly
RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
- conf = config;
}
private void doRead() throws Exception {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
index c5a52049128..1f66b38c309 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java
@@ -35,7 +35,7 @@ static
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> getFailoverProxy
return transport.getOmFailoverProxyProvider();
}
- static HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
getFollowerReadFailoverProxyProvider(
+ static HadoopRpcOMFollowerReadFailoverProxyProvider
getFollowerReadFailoverProxyProvider(
ObjectStore store) {
OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
(OzoneManagerProtocolClientSideTranslatorPB)
store.getClientProxy().getOzoneManagerClient();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
index 7ed84972142..1d1d47c7250 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -78,7 +78,7 @@ public class TestOzoneManagerHAFollowerReadWithAllRunning
extends TestOzoneManag
@Test
void testOMFollowerReadProxyProviderInitialization() {
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
List<OMProxyInfo<OzoneManagerProtocolPB>> omProxies =
@@ -104,7 +104,7 @@ void testOMFollowerReadProxyProviderInitialization() {
@Test
void testFollowerReadTargetsFollower() throws Exception {
ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
@@ -199,7 +199,7 @@ void testLinearizableReadConsistency() throws Exception {
assertNotSame(
OmTestUtil.getFailoverProxyProvider(getObjectStore()),
OmTestUtil.getFailoverProxyProvider(anotherObjectStore));
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
otherClientFollowerReadProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
otherClientFollowerReadProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(anotherObjectStore);
assertNotSame(
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore()),
@@ -426,7 +426,7 @@ public void testAllBucketOperations() throws Exception {
void testOMResponseLeaderOmNodeId() throws Exception {
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider =
OmTestUtil.getFailoverProxyProvider(getObjectStore());
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
// Make sure All OMs are ready
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
index a2e89a9e5b5..7361b800a3a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithStoppedNodes.java
@@ -141,7 +141,7 @@ private void testMultipartUploadWithOneOmNodeDown() throws
Exception {
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider =
OmTestUtil.getFailoverProxyProvider(getObjectStore());
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
// The omFailoverProxyProvider will point to the current leader OM node.
@@ -233,7 +233,7 @@ void testLeaderOmProxyProviderFailoverOnConnectionFailure()
throws Exception {
@Test
void testFollowerReadOmProxyProviderFailoverOnConnectionFailure() throws
Exception {
ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
String firstProxyNodeId =
followerReadFailoverProxyProvider.getCurrentProxy().getNodeId();
@@ -258,7 +258,7 @@ void
testFollowerReadOmProxyProviderFailoverOnConnectionFailure() throws Excepti
@Test
void testFollowerReadSkipsStoppedFollower() throws Exception {
ObjectStore objectStore = getObjectStore();
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
String leaderOMNodeId = getCluster().getOMLeader().getOMNodeId();
@@ -332,13 +332,13 @@ void testOMRetryProxy() {
private void changeFollowerReadInitialProxy(int omIndex) {
// Change the initial proxy to the OM to be stopped to test follower read
failover
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
followerReadFailoverProxyProvider.changeInitialProxyForTest(getCluster().getOzoneManager(omIndex).getOMNodeId());
}
private void changeFollowerReadInitialProxy(String omNodeId) {
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(getObjectStore());
followerReadFailoverProxyProvider.changeInitialProxyForTest(omNodeId);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
index 1baa2fe9bdc..8636fe0c24e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java
@@ -1130,7 +1130,7 @@ void testOMFollowerReadWithClusterDisabled() throws
Exception {
ObjectStore objectStore = ozoneClient.getObjectStore();
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
leaderFailoverProxyProvider =
OmTestUtil.getFailoverProxyProvider(objectStore);
- HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider =
+ HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider =
OmTestUtil.getFollowerReadFailoverProxyProvider(objectStore);
assertNotNull(followerReadFailoverProxyProvider);
assertTrue(followerReadFailoverProxyProvider.isUseFollowerRead());
diff --git
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index 919e06dc33f..ee2d9a9bc52 100644
---
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -46,7 +46,7 @@ public class Hadoop27RpcTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
- private HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB>
followerReadFailoverProxyProvider;
+ private HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
public Hadoop27RpcTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -68,9 +68,8 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
if (followerReadEnabled) {
- this.followerReadFailoverProxyProvider = new
HadoopRpcOMFollowerReadFailoverProxyProvider<>(
- omServiceId, OzoneManagerProtocolPB.class, omFailoverProxyProvider
- );
+ this.followerReadFailoverProxyProvider =
+ new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider);
this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
} else {
// TODO: It should be possible to simply instantiate
HadoopRpcOMFollowerReadFailoverProxyProvider
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]