This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 99f4132744c HDDS-14682. Unify OzoneManagerProtocolPB failover proxy
provider (#9929)
99f4132744c is described below
commit 99f4132744cb373bfb1d3ff3e02e508c9d05c84b
Author: Ivan Andika <[email protected]>
AuthorDate: Fri Mar 27 21:22:19 2026 +0800
HDDS-14682. Unify OzoneManagerProtocolPB failover proxy provider (#9929)
---
...doopRpcOMFollowerReadFailoverProxyProvider.java | 55 +++++++++----------
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 61 ++++++----------------
...stOzoneManagerHAFollowerReadWithAllRunning.java | 5 +-
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 61 ++++++----------------
4 files changed, 63 insertions(+), 119 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
index 305f5c51769..eec55683f32 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -71,7 +71,7 @@ public class HadoopRpcOMFollowerReadFailoverProxyProvider
implements FailoverPro
private static final Logger LOG =
LoggerFactory.getLogger(HadoopRpcOMFollowerReadFailoverProxyProvider.class);
/** The inner proxy provider used for leader-based failover. */
- private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
failoverProxy;
+ private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
leaderProxy;
/** The combined proxy which redirects to other proxies as necessary. */
private final ProxyInfo<OzoneManagerProtocolPB> combinedProxy;
@@ -97,30 +97,31 @@ public class HadoopRpcOMFollowerReadFailoverProxyProvider
implements FailoverPro
private final ReadConsistencyHint leaderReadConsistency;
public HadoopRpcOMFollowerReadFailoverProxyProvider(
- HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> failoverProxy
+ HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> leaderProxy
) {
- this(failoverProxy, ReadConsistency.LINEARIZABLE_ALLOW_FOLLOWER,
ReadConsistency.DEFAULT);
+ this(leaderProxy, ReadConsistency.LINEARIZABLE_ALLOW_FOLLOWER,
ReadConsistency.DEFAULT, true);
}
public HadoopRpcOMFollowerReadFailoverProxyProvider(
- HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> failoverProxy,
+ HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> leaderProxy,
ReadConsistency followerReadConsistencyType,
- ReadConsistency leaderReadConsistencyType) {
+ ReadConsistency leaderReadConsistencyType,
+ boolean useFollowerRead) {
Preconditions.assertTrue(followerReadConsistencyType.allowFollowerRead(),
"Invalid follower read consistency " + followerReadConsistencyType);
Preconditions.assertTrue(!leaderReadConsistencyType.allowFollowerRead(),
"Invalid leader read consistency " + leaderReadConsistencyType);
- this.failoverProxy = failoverProxy;
+ this.leaderProxy = leaderProxy;
// Create a wrapped proxy containing all the proxies. Since this combined
// proxy is just redirecting to other proxies, all invocations can share
it.
- final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
+ final String combinedInfo = "[" + leaderProxy.getOMProxies().stream()
.map(a -> a.proxyInfo)
.reduce((a, b) -> a + ", " + b).orElse("") + "]";
OzoneManagerProtocolPB wrappedProxy = (OzoneManagerProtocolPB)
Proxy.newProxyInstance(
FollowerReadInvocationHandler.class.getClassLoader(),
new Class<?>[] {OzoneManagerProtocolPB.class}, new
FollowerReadInvocationHandler());
combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo);
- this.useFollowerRead = true;
+ this.useFollowerRead = useFollowerRead;
this.followerReadConsistency = followerReadConsistencyType.getHint();
this.leaderReadConsistency = leaderReadConsistencyType.getHint();
}
@@ -139,7 +140,7 @@ public ProxyInfo<OzoneManagerProtocolPB> getProxy() {
public void performFailover(OzoneManagerProtocolPB currProxy) {
// Since FollowerReadInvocationHandler might user or fallback to
leader-based failover logic,
// we should delegate the failover logic to the leader's failover.
- failoverProxy.performFailover(currProxy);
+ leaderProxy.performFailover(currProxy);
}
public RetryPolicy getRetryPolicy(int maxFailovers) {
@@ -149,7 +150,7 @@ public RetryPolicy getRetryPolicy(int maxFailovers) {
// (when follower read is disabled or using write request)
// 2. The FollowerInvocationHandler is also written so that the thrown
exception is handled by the
// OMFailoverProxyProviderbase's RetryPolicy
- return failoverProxy.getRetryPolicy(maxFailovers);
+ return leaderProxy.getRetryPolicy(maxFailovers);
}
/**
@@ -202,15 +203,15 @@ public OMProxyInfo<OzoneManagerProtocolPB>
getCurrentProxy() {
* @return The new proxy that should be used.
*/
private synchronized OMProxyInfo<OzoneManagerProtocolPB>
changeProxy(OMProxyInfo<OzoneManagerProtocolPB> initial) {
- OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
+ OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
leaderProxy.getOMProxyMap().get(currentIndex);
if (currentProxy != initial) {
// Must have been a concurrent modification; ignore the move request
return currentProxy;
}
- final OMProxyInfo.OrderedMap<OzoneManagerProtocolPB> omProxies =
failoverProxy.getOMProxyMap();
+ final OMProxyInfo.OrderedMap<OzoneManagerProtocolPB> omProxies =
leaderProxy.getOMProxyMap();
currentIndex = (currentIndex + 1) % omProxies.size();
final String currentOmNodeId = omProxies.getNodeId(currentIndex);
- currentProxy = failoverProxy.createOMProxyIfNeeded(currentOmNodeId);
+ currentProxy = leaderProxy.createOMProxyIfNeeded(currentOmNodeId);
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
@@ -258,7 +259,7 @@ public Object invoke(Object proxy, final Method method,
final Object[] args)
if (isFollowerReadEligible) {
int failedCount = 0;
- for (int i = 0; useFollowerRead && i <
failoverProxy.getOMProxyMap().size(); i++) {
+ for (int i = 0; useFollowerRead && i <
leaderProxy.getOMProxyMap().size(); i++) {
OMProxyInfo<OzoneManagerProtocolPB> current = getCurrentProxy();
LOG.debug("Attempting to service {} with cmdType {} using proxy {}",
method.getName(), omRequest.getCmdType(), current.proxyInfo);
@@ -320,7 +321,7 @@ public Object invoke(Object proxy, final Method method,
final Object[] args)
}
}
- if (!failoverProxy.shouldFailover(e)) {
+ if (!leaderProxy.shouldFailover(e)) {
// We reuse the leader proxy provider failover since we want to
ensure
// if the follower read proxy decides that the exception should
be failed,
// the leader proxy provider failover retry policy (i.e.
OMFailoverProxyProviderBase#getRetryPolicy)
@@ -363,17 +364,17 @@ public Object invoke(Object proxy, final Method method,
final Object[] args)
// or this is a write request. In any case, forward the request to
// the leader OM.
LOG.debug("Using leader-based failoverProxy to service {}",
method.getName());
- final OMProxyInfo<OzoneManagerProtocolPB> leaderProxy =
failoverProxy.getProxy();
+ final OMProxyInfo<OzoneManagerProtocolPB> currentLeaderProxy =
leaderProxy.getProxy();
Object retVal = null;
try {
- retVal = method.invoke(leaderProxy.getProxy(), args);
+ retVal = method.invoke(currentLeaderProxy.getProxy(), args);
} catch (InvocationTargetException e) {
LOG.debug("Exception thrown from leader-based failoverProxy",
e.getCause());
// This exception will be handled by the
OMFailoverProxyProviderBase#getRetryPolicy
// (see getRetryPolicy). This ensures that the leader-only failover
should still work.
throwServiceException(e.getCause());
}
- lastProxy = leaderProxy;
+ lastProxy = currentLeaderProxy;
return retVal;
}
@@ -385,15 +386,15 @@ public void close() throws IOException {
@Override
public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(useFollowerRead
- ? getCurrentProxy().proxy : failoverProxy.getProxy().getProxy());
+ ? getCurrentProxy().proxy : leaderProxy.getProxy().getProxy());
}
}
@Override
public synchronized void close() throws IOException {
- // All the proxies are stored in the underlying failoverProxy
- // so we invoke close on the underlying failoverProxy
- failoverProxy.close();
+ // All the proxies are stored in the underlying leaderProxy
+ // so we invoke close on the underlying leaderProxy
+ leaderProxy.close();
}
@VisibleForTesting
@@ -403,22 +404,22 @@ public boolean isUseFollowerRead() {
@VisibleForTesting
public List<OMProxyInfo<OzoneManagerProtocolPB>> getOMProxies() {
- return failoverProxy.getOMProxies();
+ return leaderProxy.getOMProxies();
}
public synchronized void changeInitialProxyForTest(String initialOmNodeId) {
- final OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
failoverProxy.getOMProxyMap().get(currentIndex);
+ final OMProxyInfo<OzoneManagerProtocolPB> currentProxy =
leaderProxy.getOMProxyMap().get(currentIndex);
if (currentProxy != null &&
currentProxy.getNodeId().equals(initialOmNodeId)) {
return;
}
- int indexOfTargetNodeId =
failoverProxy.getOMProxyMap().indexOf(initialOmNodeId);
- if (indexOfTargetNodeId == -1) {
+ Integer indexOfTargetNodeId =
leaderProxy.getOMProxyMap().indexOf(initialOmNodeId);
+ if (indexOfTargetNodeId == null) {
return;
}
currentIndex = indexOfTargetNodeId;
- failoverProxy.createOMProxyIfNeeded(initialOmNodeId);
+ leaderProxy.createOMProxyIfNeeded(initialOmNodeId);
}
/**
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index c5a6988dc7f..98fcd66e762 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -37,7 +37,6 @@
import org.apache.hadoop.ozone.om.helpers.ReadConsistency;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -53,9 +52,7 @@ public class Hadoop3OmTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
- private final boolean followerReadEnabled;
- private final ReadConsistencyHint defaultLeaderReadConsistencyHint;
- private HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
+ private final HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
public Hadoop3OmTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -67,10 +64,9 @@ public Hadoop3OmTransport(ConfigurationSource conf,
this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
- followerReadEnabled = conf.getBoolean(
+ boolean followerReadEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
- OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
- );
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -79,37 +75,29 @@ public Hadoop3OmTransport(ConfigurationSource conf,
String defaultLeaderReadConsistencyStr =
conf.get(OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY,
OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT);
ReadConsistency defaultLeaderReadConsistency =
ReadConsistency.valueOf(defaultLeaderReadConsistencyStr);
- defaultLeaderReadConsistencyHint = defaultLeaderReadConsistency.getHint();
// TODO: In the future, we might support more FollowerReadProxyProvider
strategies depending on factors
// like latency, applied index, etc.
// So instead of enabling using follower read configuration, we can
simply let user to configure the
// failover proxy provider instead (similar to
dfs.client.failover.proxy.provider.<nameservice>)
- if (followerReadEnabled) {
- String defaultFollowerReadConsistencyStr = conf.get(
- OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
-
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
- );
- ReadConsistency defaultFollowerReadConsistency =
- ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
- this.followerReadFailoverProxyProvider =
- new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
- defaultFollowerReadConsistency,
- defaultLeaderReadConsistency);
- this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
- } else {
- // TODO HDDS-14682: It should be possible to simply instantiate
HadoopRpcOMFollowerReadFailoverProxyProvider
- // even if the follower read is not enabled. We can try this to ensure
that the tests still pass which
- // suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a
indeed a superset of
- // HadoopRpcOMFollowerReadFailoverProxyProvider
- this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider,
maxFailovers);
- }
+ String defaultFollowerReadConsistencyStr = conf.get(
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
+ );
+ ReadConsistency defaultFollowerReadConsistency =
+ ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
+ this.followerReadFailoverProxyProvider =
+ new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
+ defaultFollowerReadConsistency,
+ defaultLeaderReadConsistency,
+ followerReadEnabled);
+ this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
}
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
- return rpcProxy.submitRequest(NULL_RPC_CONTROLLER,
getOMRequest(payload));
+ return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
@@ -120,23 +108,6 @@ public OMResponse submitRequest(OMRequest payload) throws
IOException {
}
}
- private OMRequest getOMRequest(OMRequest basePayload) {
- // TODO HDDS-14682: We can remove this logic once we always use
FollowerReadProxyProvider
- if (followerReadEnabled) {
- // Follower read uses FollowerReadInvocationHandler to set the
invocation handler
- // Return the request payload as is
- return basePayload;
- }
- if (basePayload.hasReadConsistencyHint()) {
- // If there is already user-defined read consistency hint, we should
respect it
- return basePayload;
- }
-
- return basePayload.toBuilder()
- .setReadConsistencyHint(defaultLeaderReadConsistencyHint)
- .build();
- }
-
@Override
public Text getDelegationTokenService() {
return omFailoverProxyProvider.getCurrentProxyDelegationToken();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
index d984ced1fc2..9262da093a4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAFollowerReadWithAllRunning.java
@@ -31,7 +31,6 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -487,7 +486,9 @@ void testClientWithFollowerReadDisabled() throws Exception {
// This will trigger getServiceId so the client should point to the
leader
leaderOnlyClient = OzoneClientFactory.getRpcClient(getOmServiceId(),
clientConf);
ObjectStore leaderOnlyObjectStore = leaderOnlyClient.getObjectStore();
-
assertNull(OmTestUtil.getFollowerReadFailoverProxyProvider(leaderOnlyObjectStore));
+ // FollowerReadProxyProvider is the unified proxy provider regardless of
whether follower
+ // read is enabled or not
+
assertNotNull(OmTestUtil.getFollowerReadFailoverProxyProvider(leaderOnlyObjectStore));
HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
leaderProxyProvider =
OmTestUtil.getFailoverProxyProvider(leaderOnlyObjectStore);
diff --git
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index b1f0e5f1bb4..c03a5173806 100644
---
a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++
b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -51,9 +50,7 @@ public class Hadoop27RpcTransport implements OmTransport {
private final OzoneManagerProtocolPB rpcProxy;
private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
omFailoverProxyProvider;
- private final boolean followerReadEnabled;
- private final ReadConsistencyHint defaultLeaderReadConsistencyHint;
- private HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
+ private final HadoopRpcOMFollowerReadFailoverProxyProvider
followerReadFailoverProxyProvider;
public Hadoop27RpcTransport(ConfigurationSource conf,
UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -65,10 +62,9 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider<>(
conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
- followerReadEnabled = conf.getBoolean(
+ boolean followerReadEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_KEY,
- OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT
- );
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_ENABLED_DEFAULT);
int maxFailovers = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -77,34 +73,25 @@ public Hadoop27RpcTransport(ConfigurationSource conf,
String defaultLeaderReadConsistencyStr =
conf.get(OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_KEY,
OZONE_CLIENT_LEADER_READ_DEFAULT_CONSISTENCY_DEFAULT);
ReadConsistency defaultLeaderReadConsistency =
ReadConsistency.valueOf(defaultLeaderReadConsistencyStr);
- defaultLeaderReadConsistencyHint = defaultLeaderReadConsistency.getHint();
-
- if (followerReadEnabled) {
- String defaultFollowerReadConsistencyStr = conf.get(
- OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
-
OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
- );
- ReadConsistency defaultFollowerReadConsistency =
- ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
- this.followerReadFailoverProxyProvider =
- new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
- defaultFollowerReadConsistency,
- defaultLeaderReadConsistency);
- this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
- } else {
- // TODO: It should be possible to simply instantiate
HadoopRpcOMFollowerReadFailoverProxyProvider
- // even if the follower read is not enabled. We can try this to ensure
that the tests still pass which
- // suggests that the HadoopRpcOMFollowerReadFailoverProxyProvider is a
indeed a superset of
- // HadoopRpcOMFollowerReadFailoverProxyProvider
- this.rpcProxy = OzoneManagerProtocolPB.newProxy(omFailoverProxyProvider,
maxFailovers);
- }
+ String defaultFollowerReadConsistencyStr = conf.get(
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FOLLOWER_READ_DEFAULT_CONSISTENCY_DEFAULT
+ );
+ ReadConsistency defaultFollowerReadConsistency =
+ ReadConsistency.valueOf(defaultFollowerReadConsistencyStr);
+ this.followerReadFailoverProxyProvider =
+ new
HadoopRpcOMFollowerReadFailoverProxyProvider(omFailoverProxyProvider,
+ defaultFollowerReadConsistency,
+ defaultLeaderReadConsistency,
+ followerReadEnabled);
+ this.rpcProxy =
OzoneManagerProtocolPB.newProxy(followerReadFailoverProxyProvider,
maxFailovers);
}
@Override
public OMResponse submitRequest(OMRequest payload) throws IOException {
try {
- return rpcProxy.submitRequest(NULL_RPC_CONTROLLER,
getOMRequest(payload));
+ return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
OMNotLeaderException notLeaderException =
HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
@@ -115,22 +102,6 @@ public OMResponse submitRequest(OMRequest payload) throws
IOException {
}
}
- private OMRequest getOMRequest(OMRequest basePayload) {
- if (followerReadEnabled) {
- // Follower read uses FollowerReadInvocationHandler to set the
invocation handler
- // Return the request payload as is
- return basePayload;
- }
- if (basePayload.hasReadConsistencyHint()) {
- // If there is already user-defined read consistency hint, we should
respect it
- return basePayload;
- }
-
- return basePayload.toBuilder()
- .setReadConsistencyHint(defaultLeaderReadConsistencyHint)
- .build();
- }
-
@Override
public Text getDelegationTokenService() {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]