This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 526eec08a45 HDDS-15264. Fork RetryInvocationHandler from Hadoop
(#10263)
526eec08a45 is described below
commit 526eec08a45a4f9a25405e3546e8ddb9b15b5b85
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sat May 16 19:48:38 2026 +0200
HDDS-15264. Fork RetryInvocationHandler from Hadoop (#10263)
---
.../apache/hadoop/io_/retry/AsyncCallHandler.java | 323 ++++++++++++++
.../org/apache/hadoop/io_/retry/CallReturn.java | 79 ++++
.../hadoop/io_/retry/RetryInvocationHandler.java | 466 +++++++++++++++++++++
.../org/apache/hadoop/io_/retry/RetryProxy.java | 69 +++
.../SCMSecurityProtocolClientSideTranslatorPB.java | 2 +-
.../SecretKeyProtocolClientSideTranslatorPB.java | 2 +-
...lockLocationProtocolClientSideTranslatorPB.java | 2 +-
...inerLocationProtocolClientSideTranslatorPB.java | 2 +-
.../protocolPB/OMAdminProtocolClientSideImpl.java | 2 +-
.../OMInterServiceProtocolClientSideImpl.java | 2 +-
.../om/protocolPB/OzoneManagerProtocolPB.java | 2 +-
...doopRpcOMFollowerReadFailoverProxyProvider.java | 4 +-
hadoop-ozone/dist/src/main/compose/test-all.sh | 2 +-
.../hadoop/ozone/shell/TestOzoneTenantShell.java | 2 +-
.../hadoop/ozone/om/failover/TestOMFailovers.java | 2 +-
pom.xml | 2 +-
16 files changed, 950 insertions(+), 13 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/AsyncCallHandler.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/AsyncCallHandler.java
new file mode 100644
index 00000000000..e473c4d7fab
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/AsyncCallHandler.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io_.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc_.Client;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Handle async calls. */
+public class AsyncCallHandler {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ AsyncCallHandler.class);
+
+ private static final ThreadLocal<AsyncGet<?, Exception>>
+ LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
+ private static final ThreadLocal<AsyncGet<Object, Throwable>>
+ ASYNC_RETURN = new ThreadLocal<>();
+
+ private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
+ final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
+ Preconditions.checkNotNull(asyncGet);
+ LOWER_LAYER_ASYNC_RETURN.set(null);
+ return asyncGet;
+ }
+
+ /** A simple concurrent queue which keeping track the empty start time. */
+ static class ConcurrentQueue<T> {
+ private final Queue<T> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong emptyStartTime
+ = new AtomicLong(Time.monotonicNow());
+
+ Iterator<T> iterator() {
+ return queue.iterator();
+ }
+
+ /** Is the queue empty for more than the given time in millisecond? */
+ boolean isEmpty(long time) {
+ return Time.monotonicNow() - emptyStartTime.get() > time
+ && queue.isEmpty();
+ }
+
+ void offer(T c) {
+ final boolean added = queue.offer(c);
+ Preconditions.checkState(added);
+ }
+
+ void checkEmpty() {
+ if (queue.isEmpty()) {
+ emptyStartTime.set(Time.monotonicNow());
+ }
+ }
+ }
+
+ /** A queue for handling async calls. */
+ class AsyncCallQueue {
+ private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
+ private final Processor processor = new Processor();
+
+ void addCall(AsyncCall call) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add " + call);
+ }
+ queue.offer(call);
+ processor.tryStart();
+ }
+
+ long checkCalls() {
+ final long startTime = Time.monotonicNow();
+ long minWaitTime = Processor.MAX_WAIT_PERIOD;
+
+ for (final Iterator<AsyncCall> i = queue.iterator(); i.hasNext();) {
+ final AsyncCall c = i.next();
+ if (c.isDone()) {
+ i.remove(); // the call is done, remove it from the queue.
+ queue.checkEmpty();
+ } else {
+ final Long waitTime = c.getWaitTime(startTime);
+ if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) {
+ minWaitTime = waitTime;
+ }
+ }
+ }
+ return minWaitTime;
+ }
+
+ /** Process the async calls in the queue. */
+ private class Processor {
+ static final long GRACE_PERIOD = 3*1000L;
+ static final long MAX_WAIT_PERIOD = 100L;
+
+ private final AtomicReference<Thread> running = new AtomicReference<>();
+
+ boolean isRunning(Daemon d) {
+ return d == running.get();
+ }
+
+ void tryStart() {
+ final Thread current = Thread.currentThread();
+ if (running.compareAndSet(null, current)) {
+ final Daemon daemon = new Daemon() {
+ @Override
+ public void run() {
+ for (; isRunning(this);) {
+ final long waitTime = checkCalls();
+ tryStop(this);
+
+ try {
+ synchronized (AsyncCallHandler.this) {
+ AsyncCallHandler.this.wait(waitTime);
+ }
+ } catch (InterruptedException e) {
+ kill(this);
+ }
+ }
+ }
+ };
+
+ final boolean set = running.compareAndSet(current, daemon);
+ Preconditions.checkState(set);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
+ }
+ daemon.start();
+ }
+ }
+
+ void tryStop(Daemon d) {
+ if (queue.isEmpty(GRACE_PERIOD)) {
+ kill(d);
+ }
+ }
+
+ void kill(Daemon d) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Killing " + d);
+ }
+ final boolean set = running.compareAndSet(d, null);
+ Preconditions.checkState(set);
+ }
+ }
+ }
+
+ static class AsyncValue<V> {
+ private V value;
+
+ synchronized V waitAsyncValue(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (value != null) {
+ return value;
+ }
+ AsyncGet.Util.wait(this, timeout, unit);
+ if (value != null) {
+ return value;
+ }
+
+ throw new TimeoutException("waitCallReturn timed out "
+ + timeout + " " + unit);
+ }
+
+ synchronized void set(V v) {
+ Preconditions.checkNotNull(v);
+ Preconditions.checkState(value == null);
+ value = v;
+ notify();
+ }
+
+ synchronized boolean isDone() {
+ return value != null;
+ }
+ }
+
+ static class AsyncCall extends RetryInvocationHandler.Call {
+ private final AsyncCallHandler asyncCallHandler;
+
+ private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
+ private AsyncGet<?, Exception> lowerLayerAsyncGet;
+
+ AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
+ RetryInvocationHandler<?> retryInvocationHandler,
+ AsyncCallHandler asyncCallHandler) {
+ super(method, args, isRpc, callId, retryInvocationHandler);
+
+ this.asyncCallHandler = asyncCallHandler;
+ }
+
+ /** @return true if the call is done; otherwise, return false. */
+ boolean isDone() {
+ final CallReturn r = invokeOnce();
+ LOG.debug("#{}: {}", getCallId(), r.getState());
+ switch (r.getState()) {
+ case RETURNED:
+ case EXCEPTION:
+ asyncCallReturn.set(r); // the async call is done
+ return true;
+ case RETRY:
+ invokeOnce();
+ break;
+ case WAIT_RETRY:
+ case ASYNC_CALL_IN_PROGRESS:
+ case ASYNC_INVOKED:
+ // nothing to do
+ break;
+ default:
+ Preconditions.checkState(false);
+ }
+ return false;
+ }
+
+ @Override
+ CallReturn processWaitTimeAndRetryInfo() {
+ final Long waitTime = getWaitTime(Time.monotonicNow());
+ LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime);
+ if (waitTime != null && waitTime > 0) {
+ return CallReturn.WAIT_RETRY;
+ }
+ processRetryInfo();
+ return CallReturn.RETRY;
+ }
+
+ @Override
+ CallReturn invoke() throws Throwable {
+ LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
+ if (lowerLayerAsyncGet != null) {
+ // async call was submitted early, check the lower level async call
+ final boolean isDone = lowerLayerAsyncGet.isDone();
+ LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}",
+ getCallId(), isDone);
+ if (!isDone) {
+ return CallReturn.ASYNC_CALL_IN_PROGRESS;
+ }
+ try {
+ return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
+ } finally {
+ lowerLayerAsyncGet = null;
+ }
+ }
+
+ // submit a new async call
+ LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId());
+ final boolean mode = Client.isAsynchronousMode();
+ try {
+ Client.setAsynchronousMode(true);
+ final Object r = invokeMethod();
+ // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
+ Preconditions.checkState(r == null);
+ lowerLayerAsyncGet = getLowerLayerAsyncReturn();
+
+ if (getCounters().isZeros()) {
+ // first async attempt, initialize
+ LOG.trace("#{} invoke: initAsyncCall", getCallId());
+ asyncCallHandler.initAsyncCall(this, asyncCallReturn);
+ }
+ return CallReturn.ASYNC_INVOKED;
+ } finally {
+ Client.setAsynchronousMode(mode);
+ }
+ }
+ }
+
+ private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
+ private volatile boolean hasSuccessfulCall = false;
+
+ AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
+ int callId,
+ RetryInvocationHandler<?> retryInvocationHandler) {
+ return new AsyncCall(method, args, isRpc, callId,
+ retryInvocationHandler, this);
+ }
+
+ boolean hasSuccessfulCall() {
+ return hasSuccessfulCall;
+ }
+
+ private void initAsyncCall(final AsyncCall asyncCall,
+ final AsyncValue<CallReturn> asyncCallReturn) {
+ asyncCalls.addCall(asyncCall);
+
+ final AsyncGet<Object, Throwable> asyncGet
+ = new AsyncGet<Object, Throwable>() {
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws Throwable {
+ final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
+ final Object r = c.getReturnValue();
+ hasSuccessfulCall = true;
+ return r;
+ }
+
+ @Override
+ public boolean isDone() {
+ return asyncCallReturn.isDone();
+ }
+ };
+ ASYNC_RETURN.set(asyncGet);
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java
new file mode 100644
index 00000000000..a65125a68db
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/CallReturn.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io_.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+/** The call return from a method invocation. */
+class CallReturn {
+ /** The return state. */
+ enum State {
+ /** Call is returned successfully. */
+ RETURNED,
+ /** Call throws an exception. */
+ EXCEPTION,
+ /** Call should be retried according to the {@link RetryPolicy}. */
+ RETRY,
+ /** Call should wait and then retry according to the {@link RetryPolicy}.
*/
+ WAIT_RETRY,
+ /** Call, which is async, is still in progress. */
+ ASYNC_CALL_IN_PROGRESS,
+ /** Call, which is async, just has been invoked. */
+ ASYNC_INVOKED
+ }
+
+ static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
+ State.ASYNC_CALL_IN_PROGRESS);
+ static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
+ static final CallReturn RETRY = new CallReturn(State.RETRY);
+ static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY);
+
+ private final Object returnValue;
+ private final Throwable thrown;
+ private final State state;
+
+ CallReturn(Object r) {
+ this(r, null, State.RETURNED);
+ }
+ CallReturn(Throwable t) {
+ this(null, t, State.EXCEPTION);
+ Preconditions.checkNotNull(t);
+ }
+ private CallReturn(State s) {
+ this(null, null, s);
+ }
+ private CallReturn(Object r, Throwable t, State s) {
+ Preconditions.checkArgument(r == null || t == null);
+ returnValue = r;
+ thrown = t;
+ state = s;
+ }
+
+ State getState() {
+ return state;
+ }
+
+ Object getReturnValue() throws Throwable {
+ if (state == State.EXCEPTION) {
+ throw thrown;
+ }
+ Preconditions.checkState(state == State.RETURNED, "state == %s", state);
+ return returnValue;
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java
new file mode 100644
index 00000000000..82d8994c5c8
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryInvocationHandler.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io_.retry;
+
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.io.retry.MultiException;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc_.*;
+import org.apache.hadoop.ipc_.Client.ConnectionId;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * A {@link RpcInvocationHandler} which supports client side retry .
+ */
+public class RetryInvocationHandler<T> implements RpcInvocationHandler {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ RetryInvocationHandler.class);
+
+ public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
+ ThreadLocal.withInitial(() -> true);
+
+ static class Call {
+ private final Method method;
+ private final Object[] args;
+ private final boolean isRpc;
+ private final int callId;
+ private final Counters counters = new Counters();
+
+ private final RetryPolicy retryPolicy;
+ private final RetryInvocationHandler<?> retryInvocationHandler;
+
+ private RetryInfo retryInfo;
+
+ Call(Method method, Object[] args, boolean isRpc, int callId,
+ RetryInvocationHandler<?> retryInvocationHandler) {
+ this.method = method;
+ this.args = args;
+ this.isRpc = isRpc;
+ this.callId = callId;
+
+ this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
+ this.retryInvocationHandler = retryInvocationHandler;
+ }
+
+ int getCallId() {
+ return callId;
+ }
+
+ Counters getCounters() {
+ return counters;
+ }
+
+ synchronized Long getWaitTime(final long now) {
+ return retryInfo == null? null: retryInfo.retryTime - now;
+ }
+
+ /** Invoke the call once without retrying. */
+ synchronized CallReturn invokeOnce() {
+ try {
+ if (retryInfo != null) {
+ return processWaitTimeAndRetryInfo();
+ }
+
+ // The number of times this invocation handler has ever been failed
over
+ // before this method invocation attempt. Used to prevent concurrent
+ // failed method invocations from triggering multiple failover
attempts.
+ final long failoverCount = retryInvocationHandler.getFailoverCount();
+ try {
+ return invoke();
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(toString(), e);
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ // If interrupted, do not retry.
+ throw e;
+ }
+
+ retryInfo = retryInvocationHandler.handleException(
+ method, callId, retryPolicy, counters, failoverCount, e);
+ return processWaitTimeAndRetryInfo();
+ }
+ } catch(Throwable t) {
+ return new CallReturn(t);
+ }
+ }
+
+ /**
+ * It first processes the wait time, if there is any,
+ * and then invokes {@link #processRetryInfo()}.
+ *
+ * If the wait time is positive, it either sleeps for synchronous calls
+ * or immediately returns for asynchronous calls.
+ *
+ * @return {@link CallReturn#RETRY} if the retryInfo is processed;
+ * otherwise, return {@link CallReturn#WAIT_RETRY}.
+ */
+ CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
+ final Long waitTime = getWaitTime(Time.monotonicNow());
+ LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
+ callId, retryInfo, waitTime);
+ if (waitTime != null && waitTime > 0) {
+ try {
+ Thread.sleep(retryInfo.delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while waiting to retry", e);
+ }
+ InterruptedIOException intIOE = new InterruptedIOException(
+ "Retry interrupted");
+ intIOE.initCause(e);
+ throw intIOE;
+ }
+ }
+ processRetryInfo();
+ return CallReturn.RETRY;
+ }
+
+ synchronized void processRetryInfo() {
+ counters.retries++;
+ if (retryInfo.isFailover()) {
+ retryInvocationHandler.proxyDescriptor.failover(
+ retryInfo.expectedFailoverCount, method, callId);
+ counters.failovers++;
+ }
+ retryInfo = null;
+ }
+
+ CallReturn invoke() throws Throwable {
+ return new CallReturn(invokeMethod());
+ }
+
+ Object invokeMethod() throws Throwable {
+ if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
+ Client.setCallIdAndRetryCount(callId, counters.retries,
+ retryInvocationHandler.asyncCallHandler);
+ }
+ return retryInvocationHandler.invokeMethod(method, args);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "#" + callId + ": "
+ + method.getDeclaringClass().getSimpleName() + "." + method.getName()
+ + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
+ + ")";
+ }
+ }
+
+ static class Counters {
+ /** Counter for retries. */
+ private int retries;
+ /** Counter for method invocation has been failed over. */
+ private int failovers;
+
+ boolean isZeros() {
+ return retries == 0 && failovers == 0;
+ }
+ }
+
+ private static class ProxyDescriptor<T> {
+ private final FailoverProxyProvider<T> fpp;
+ /** Count the associated proxy provider has ever been failed over. */
+ private long failoverCount = 0;
+
+ private ProxyInfo<T> proxyInfo;
+
+ ProxyDescriptor(FailoverProxyProvider<T> fpp) {
+ this.fpp = fpp;
+ this.proxyInfo = fpp.getProxy();
+ }
+
+ synchronized ProxyInfo<T> getProxyInfo() {
+ return proxyInfo;
+ }
+
+ synchronized T getProxy() {
+ return proxyInfo.proxy;
+ }
+
+ synchronized long getFailoverCount() {
+ return failoverCount;
+ }
+
+ synchronized void failover(long expectedFailoverCount, Method method,
+ int callId) {
+ // Make sure that concurrent failed invocations only cause a single
+ // actual failover.
+ if (failoverCount == expectedFailoverCount) {
+ fpp.performFailover(proxyInfo.proxy);
+ failoverCount++;
+ } else {
+ LOG.warn("A failover has occurred since the start of call #" + callId
+ + " " + proxyInfo.getString(method.getName()));
+ }
+ proxyInfo = fpp.getProxy();
+ }
+
+ boolean idempotentOrAtMostOnce(Method method) throws NoSuchMethodException
{
+ final Method m = fpp.getInterface()
+ .getMethod(method.getName(), method.getParameterTypes());
+ return m.isAnnotationPresent(Idempotent.class)
+ || m.isAnnotationPresent(AtMostOnce.class);
+ }
+
+ void close() throws IOException {
+ fpp.close();
+ }
+ }
+
+ private static class RetryInfo {
+ private final long retryTime;
+ private final long delay;
+ private final RetryAction action;
+ private final long expectedFailoverCount;
+ private final Exception failException;
+
+ RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
+ Exception failException) {
+ this.delay = delay;
+ this.retryTime = Time.monotonicNow() + delay;
+ this.action = action;
+ this.expectedFailoverCount = expectedFailoverCount;
+ this.failException = failException;
+ }
+
+ boolean isFailover() {
+ return action != null
+ && action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+ }
+
+ boolean isFail() {
+ return action != null
+ && action.action == RetryAction.RetryDecision.FAIL;
+ }
+
+ Exception getFailException() {
+ return failException;
+ }
+
+ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
+ Counters counters, boolean idempotentOrAtMostOnce,
+ long expectedFailoverCount) throws Exception {
+ RetryAction max = null;
+ long maxRetryDelay = 0;
+ Exception ex = null;
+
+ final Iterable<Exception> exceptions = e instanceof MultiException ?
+ ((MultiException) e).getExceptions().values()
+ : Collections.singletonList(e);
+ for (Exception exception : exceptions) {
+ final RetryAction a = policy.shouldRetry(exception,
+ counters.retries, counters.failovers, idempotentOrAtMostOnce);
+ if (a.action != RetryAction.RetryDecision.FAIL) {
+ // must be a retry or failover
+ if (a.delayMillis > maxRetryDelay) {
+ maxRetryDelay = a.delayMillis;
+ }
+ }
+
+ if (max == null || max.action.compareTo(a.action) < 0) {
+ max = a;
+ if (a.action == RetryAction.RetryDecision.FAIL) {
+ ex = exception;
+ }
+ }
+ }
+
+ return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);
+ }
+
+ @Override
+ public String toString() {
+ return "RetryInfo{" +
+ "retryTime=" + retryTime +
+ ", delay=" + delay +
+ ", action=" + action +
+ ", expectedFailoverCount=" + expectedFailoverCount +
+ ", failException=" + failException +
+ '}';
+ }
+ }
+
+ private final ProxyDescriptor<T> proxyDescriptor;
+
+ private volatile boolean hasSuccessfulCall = false;
+
+ private HashSet<String> failedAtLeastOnce = new HashSet<>();
+
+ private final RetryPolicy defaultPolicy;
+ private final Map<String,RetryPolicy> methodNameToPolicyMap;
+
+ private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
+
+ protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
+ RetryPolicy retryPolicy) {
+ this(proxyProvider, retryPolicy, Collections.<String,
RetryPolicy>emptyMap());
+ }
+
+ protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
+ RetryPolicy defaultPolicy,
+ Map<String, RetryPolicy> methodNameToPolicyMap) {
+ this.proxyDescriptor = new ProxyDescriptor<>(proxyProvider);
+ this.defaultPolicy = defaultPolicy;
+ this.methodNameToPolicyMap = methodNameToPolicyMap;
+ }
+
+ private RetryPolicy getRetryPolicy(Method method) {
+ final RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
+ return policy != null? policy: defaultPolicy;
+ }
+
+ private long getFailoverCount() {
+ return proxyDescriptor.getFailoverCount();
+ }
+
+ private Call newCall(Method method, Object[] args, boolean isRpc,
+ int callId) {
+ if (Client.isAsynchronousMode()) {
+ return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
+ } else {
+ return new Call(method, args, isRpc, callId, this);
+ }
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
+ final int callId = isRpc? Client.nextCallId():
RpcConstants.INVALID_CALL_ID;
+
+ final Call call = newCall(method, args, isRpc, callId);
+ while (true) {
+ final CallReturn c = call.invokeOnce();
+ final CallReturn.State state = c.getState();
+ if (state == CallReturn.State.ASYNC_INVOKED) {
+ return null; // return null for async calls
+ } else if (c.getState() != CallReturn.State.RETRY) {
+ return c.getReturnValue();
+ }
+ }
+ }
+
+ private RetryInfo handleException(final Method method, final int callId,
+ final RetryPolicy policy, final Counters counters,
+ final long expectFailoverCount, final Exception e) throws Exception {
+ final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
+ counters, proxyDescriptor.idempotentOrAtMostOnce(method),
+ expectFailoverCount);
+ if (retryInfo.isFail()) {
+ // fail.
+ if (retryInfo.action.reason != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception while invoking call #" + callId + " "
+ + proxyDescriptor.getProxyInfo().getString(method.getName())
+ + ". Not retrying because " + retryInfo.action.reason, e);
+ }
+ }
+ throw retryInfo.getFailException();
+ }
+
+ log(method, retryInfo.isFailover(), counters.failovers, counters.retries,
retryInfo.delay, e);
+ return retryInfo;
+ }
+
+ private void log(final Method method, final boolean isFailover, final int
failovers,
+ final int retries, final long delay, final Exception ex) {
+ boolean info = true;
+ // If this is the first failover to this proxy, skip logging at INFO level
+ if (!failedAtLeastOnce.contains(proxyDescriptor.getProxyInfo().toString()))
+ {
+ failedAtLeastOnce.add(proxyDescriptor.getProxyInfo().toString());
+
+ // If successful calls were made to this proxy, log info even for first
+ // failover
+ info = hasSuccessfulCall || asyncCallHandler.hasSuccessfulCall();
+ if (!info && !LOG.isDebugEnabled()) {
+ return;
+ }
+ }
+
+ final StringBuilder b = new StringBuilder()
+ .append(ex)
+ .append(", while invoking ")
+ .append(proxyDescriptor.getProxyInfo().getString(method.getName()));
+ if (failovers > 0) {
+ b.append(" after ").append(failovers).append(" failover attempts");
+ }
+ b.append(isFailover? ". Trying to failover ": ". Retrying ");
+ b.append(delay > 0? "after sleeping for " + delay + "ms.": "immediately.");
+ b.append(" Current retry count: ").append(retries).append(".");
+
+ if (info) {
+ LOG.info(b.toString());
+ } else {
+ LOG.debug(b.toString(), ex);
+ }
+ }
+
+ protected Object invokeMethod(Method method, Object[] args) throws Throwable
{
+ try {
+ if (!method.isAccessible()) {
+ method.setAccessible(true);
+ }
+ final Object r = method.invoke(proxyDescriptor.getProxy(), args);
+ hasSuccessfulCall = true;
+ return r;
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+
+ static boolean isRpcInvocation(Object proxy) {
+ if (proxy instanceof ProtocolTranslator) {
+ proxy = ((ProtocolTranslator) proxy).getUnderlyingProxyObject();
+ }
+ if (!Proxy.isProxyClass(proxy.getClass())) {
+ return false;
+ }
+ final InvocationHandler ih = Proxy.getInvocationHandler(proxy);
+ return ih instanceof RpcInvocationHandler;
+ }
+
+ @Override
+ public void close() throws IOException {
+ proxyDescriptor.close();
+ }
+
+ @Override //RpcInvocationHandler
+ public ConnectionId getConnectionId() {
+ return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy());
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryProxy.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryProxy.java
new file mode 100644
index 00000000000..210e5e3b655
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/io_/retry/RetryProxy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io_.retry;
+
+import java.lang.reflect.Proxy;
+import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+/**
+ * <p>
+ * A factory for creating retry proxies.
+ * </p>
+ */
+public class RetryProxy {
+ /**
+ * <p>
+ * Create a proxy for an interface of an implementation class
+ * using the same retry policy for each method in the interface.
+ * </p>
+ * @param iface the interface that the retry will implement
+ * @param implementation the instance whose methods should be retried
+ * @param retryPolicy the policy for retrying method call failures
+ * @param <T> T.
+ * @return the retry proxy
+ */
+ public static <T> Object create(Class<T> iface, T implementation,
+ RetryPolicy retryPolicy) {
+ return RetryProxy.create(iface,
+ new DefaultFailoverProxyProvider<T>(iface, implementation),
+ retryPolicy);
+ }
+
+ /**
+ * Create a proxy for an interface of implementations of that interface using
+ * the given {@link FailoverProxyProvider} and the same retry policy for each
+ * method in the interface.
+ *
+ * @param iface the interface that the retry will implement
+ * @param proxyProvider provides implementation instances whose methods
should be retried
+ * @param retryPolicy the policy for retrying or failing over method call
failures
+ * @param <T> T.
+ * @return the retry proxy
+ */
+ public static <T> Object create(Class<T> iface,
+ FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
+ return Proxy.newProxyInstance(
+ proxyProvider.getInterface().getClassLoader(),
+ new Class<?>[] { iface },
+ new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
+ );
+ }
+
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 2603f440a11..9b08f306701 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -49,7 +49,7 @@
import
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtocolTranslator;
import org.apache.hadoop.ipc_.RPC;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
index a4f7106b554..add4da85317 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SecretKeyProtocolClientSideTranslatorPB.java
@@ -41,7 +41,7 @@
import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtocolTranslator;
import org.apache.hadoop.ipc_.RPC;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index c862030aa22..717b6db2c93 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -66,7 +66,7 @@
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtocolTranslator;
import org.apache.hadoop.ozone.ClientVersion;
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 56e2ef6408f..3455a090859 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -141,7 +141,7 @@
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import
org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtocolTranslator;
import org.apache.hadoop.ipc_.RPC;
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
index 8919a2479ef..13889d98456 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
@@ -27,7 +27,7 @@
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtobufRpcEngine;
import org.apache.hadoop.ipc_.RPC;
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
index 28924f02d17..903c382d317 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
@@ -22,7 +22,7 @@
import java.io.IOException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtobufHelper;
import org.apache.hadoop.ipc_.ProtobufRpcEngine;
import org.apache.hadoop.ipc_.RPC;
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
index 6aa9c4da594..d11aa7e6aab 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolPB.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.om.protocolPB;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.ProtocolInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import
org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
index f77c5b561d4..d3275d22be6 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestHadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -49,8 +49,8 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryInvocationHandler;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ipc_.RemoteException;
import org.apache.hadoop.ipc_.RpcNoSuchProtocolException;
import org.apache.hadoop.ozone.ClientVersion;
diff --git a/hadoop-ozone/dist/src/main/compose/test-all.sh
b/hadoop-ozone/dist/src/main/compose/test-all.sh
index 3ff94aed2db..f4bfdf3c284 100755
--- a/hadoop-ozone/dist/src/main/compose/test-all.sh
+++ b/hadoop-ozone/dist/src/main/compose/test-all.sh
@@ -34,7 +34,7 @@ source "$SCRIPT_DIR"/testlib.sh
if [[ "${OZONE_WITH_COVERAGE}" == "true" ]]; then
java -cp "$PROJECT_DIR"/share/coverage/$(ls "$PROJECT_DIR"/share/coverage |
grep test-util):"$PROJECT_DIR"/share/coverage/jacoco-core.jar
org.apache.ozone.test.JacocoServer &
DOCKER_BRIDGE_IP=$(docker network inspect bridge --format='{{(index
.IPAM.Config 0).Gateway}}')
- export
OZONE_OPTS="-javaagent:share/coverage/jacoco-agent.jar=output=tcpclient,address=$DOCKER_BRIDGE_IP,includes=org.apache.hadoop.ozone.*:org.apache.hadoop.hdds.*:org.apache.hadoop.fs.ozone.*:org.apache.ozone.*:org.apache.hadoop.ipc_.*:org.apache.hadoop.security_.*"
+ export
OZONE_OPTS="-javaagent:share/coverage/jacoco-agent.jar=output=tcpclient,address=$DOCKER_BRIDGE_IP,includes=org.apache.hadoop.ozone.*:org.apache.hadoop.hdds.*:org.apache.hadoop.fs.ozone.*:org.apache.ozone.*:org.apache.hadoop.io_.*:org.apache.hadoop.ipc_.*:org.apache.hadoop.security_.*"
fi
cd "$SCRIPT_DIR"
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index ac0b9b25842..dd39145bc1b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -41,7 +41,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.io_.retry.RetryInvocationHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConsts;
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 116b141ec28..dbb6c9cfe5f 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io_.retry.RetryProxy;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
diff --git a/pom.xml b/pom.xml
index 02e4bfa145b..ca3b761c49e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2333,7 +2333,7 @@
<goal>prepare-agent</goal>
</goals>
<configuration>
-
<includes>org.apache.hadoop.hdds.*,org.apache.hadoop.ozone.*,org.apache.hadoop.fs.ozone.*,org.apache.ozone.*,org.apache.hadoop.ipc_.*,org.apache.hadoop.security_.*</includes>
+
<includes>org.apache.hadoop.hdds.*,org.apache.hadoop.ozone.*,org.apache.hadoop.fs.ozone.*,org.apache.ozone.*,org.apache.hadoop.io_.*,org.apache.hadoop.ipc_.*,org.apache.hadoop.security_.*</includes>
</configuration>
</execution>
</executions>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]