[ 
https://issues.apache.org/jira/browse/HDFS-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727725#comment-17727725
 ] 

ASF GitHub Bot commented on HDFS-17027:
---------------------------------------

simbadzina commented on code in PR #5693:
URL: https://github.com/apache/hadoop/pull/5693#discussion_r1210904336


##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java:
##########
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ClientGSIContext;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX;
+import static 
org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_DEFAULT;
+
+/**
+ * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
+ * to support automatic msync-ing when using routers.
+ *
+ * This constructs a wrapper proxy around an internal one, and
+ * injects msync calls when necessary via the InvocationHandler.
+ */
+public class RouterObserverReadProxyProvider<T> extends 
AbstractNNFailoverProxyProvider<T> {
+  @VisibleForTesting
+  static final Logger LOG = 
LoggerFactory.getLogger(ObserverReadProxyProvider.class);
+
+  /** Client-side context for syncing with the NameNode server side. */
+  private final AlignmentContext alignmentContext;
+
+  /** The inner proxy provider used for active/standby failover. */
+  private final AbstractNNFailoverProxyProvider<T> innerProxy;
+
+  /** The proxy which redirects the internal one. */
+  private final ProxyInfo<T> wrapperProxy;
+
+  /**
+   * Whether reading from observer is enabled. If this is false, this proxy
+   * will not call msync.
+   */
+  private final boolean observerReadEnabled;
+
+  /**
+   * This adjusts how frequently this proxy provider should auto-msync to the
+   * Active NameNode, automatically performing an msync() call to the active
+   * to fetch the current transaction ID before submitting read requests to
+   * observer nodes. See HDFS-14211 for more description of this feature.
+   * If this is below 0, never auto-msync. If this is 0, perform an msync on
+   * every read operation. If this is above 0, perform an msync after this many
+   * ms have elapsed since the last msync.
+   */
+  private final long autoMsyncPeriodMs;
+
+  /**
+   * The time, in millisecond epoch, that the last msync operation was
+   * performed. This includes any implicit msync (any operation which is
+   * serviced by the Active NameNode).
+   */
+  private volatile long lastMsyncTimeMs = -1;
+
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> 
xface,
+      HAProxyFactory<T> factory) {
+    this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, 
xface, factory));
+  }
+
+  @SuppressWarnings("unchecked")
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> 
xface,
+      HAProxyFactory<T> factory, AbstractNNFailoverProxyProvider<T> 
failoverProxy) {
+    super(conf, uri, xface, factory);
+    this.alignmentContext = new ClientGSIContext();
+    factory.setAlignmentContext(alignmentContext);
+    this.innerProxy = failoverProxy;
+
+    String proxyInfoString = "RouterObserverReadProxyProvider for " + 
innerProxy.getProxy();
+
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+        RouterObserverReadInvocationHandler.class.getClassLoader(),
+        new Class<?>[]{xface}, new RouterObserverReadInvocationHandler());
+    this.wrapperProxy = new ProxyInfo<>(wrappedProxy, proxyInfoString);
+
+    autoMsyncPeriodMs = conf.getTimeDuration(
+        // The host of the URI is the name service ID
+        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
+    if (wrappedProxy instanceof ClientProtocol) {
+      this.observerReadEnabled = true;
+    } else {
+      LOG.info("Disabling observer reads for {} because the requested proxy "
+          + "class does not implement {}", uri, 
ClientProtocol.class.getName());
+      this.observerReadEnabled = false;
+    }
+  }
+
+
+  public AlignmentContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return wrapperProxy;
+  }
+
+  @Override
+  public void performFailover(T currentProxy) {
+    innerProxy.performFailover(currentProxy);
+  }
+
+  @Override
+  public boolean useLogicalURI() {
+    return innerProxy.useLogicalURI();
+  }
+
+  @Override
+  public void close() throws IOException {
+    innerProxy.close();
+  }
+
+  /**
+   * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
+   * {@link ClassCastException} and wraps it in a more helpful message. This
+   * should ONLY be called if the caller is certain that the proxy is, in fact,
+   * a {@link ClientProtocol}.
+   */
+  private ClientProtocol getProxyAsClientProtocol(T proxy) {
+    assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy of 
class "
+        + proxy.getClass()
+        + " as if it was a ClientProtocol.";
+    return (ClientProtocol) proxy;
+  }
+
+  /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #innerProxy}) to update the state of this client, only
+   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+   * an msync was performed.
+   *
+   * @see #autoMsyncPeriodMs
+   */
+  private void autoMsyncIfNecessary() throws IOException {
+    if (autoMsyncPeriodMs == 0) {
+      // Always msync
+      getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+    } else if (autoMsyncPeriodMs > 0) {
+      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+        synchronized (this) {
+          // Use a synchronized block so that only one thread will msync
+          // if many operations are submitted around the same time.
+          // Re-check the entry criterion since the status may have changed
+          // while waiting for the lock.
+          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+            getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+            lastMsyncTimeMs = Time.monotonicNow();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if a method is read-only.
+   *
+   * @return whether the 'method' is a read-only operation.
+   */
+  private static boolean isRead(Method method) {
+    if (!method.isAnnotationPresent(ReadOnly.class)) {
+      return false;
+    }
+    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
+  }
+
+  private class RouterObserverReadInvocationHandler implements 
RpcInvocationHandler {
+
+    @Override
+    public Client.ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(innerProxy.getProxy().proxy);
+    }
+
+    @Override
+    public void close() throws IOException {
+      innerProxy.close();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+      if (observerReadEnabled && isRead(method)) {
+        autoMsyncIfNecessary();
+      }
+
+      Object retVal;
+      try {
+        retVal = method.invoke(innerProxy.getProxy().proxy, args);
+      } catch (InvocationTargetException e) {
+        // This exception will be handled by higher layers
+        throw e.getCause();
+      }
+
+      lastMsyncTimeMs = Time.monotonicNow();

Review Comment:
   Good catch, we don't need the update outside of `autoMsyncIfNecesary`. 
Having that extra update is actually a bug. I've added a testcase that would 
fail if that extra update is there.



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java:
##########
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ClientGSIContext;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX;
+import static 
org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_DEFAULT;
+
+/**
+ * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
+ * to support automatic msync-ing when using routers.
+ *
+ * This constructs a wrapper proxy around an internal one, and
+ * injects msync calls when necessary via the InvocationHandler.
+ */
+public class RouterObserverReadProxyProvider<T> extends 
AbstractNNFailoverProxyProvider<T> {
+  @VisibleForTesting
+  static final Logger LOG = 
LoggerFactory.getLogger(ObserverReadProxyProvider.class);
+
+  /** Client-side context for syncing with the NameNode server side. */
+  private final AlignmentContext alignmentContext;
+
+  /** The inner proxy provider used for active/standby failover. */
+  private final AbstractNNFailoverProxyProvider<T> innerProxy;
+
+  /** The proxy which redirects the internal one. */
+  private final ProxyInfo<T> wrapperProxy;
+
+  /**
+   * Whether reading from observer is enabled. If this is false, this proxy
+   * will not call msync.
+   */
+  private final boolean observerReadEnabled;
+
+  /**
+   * This adjusts how frequently this proxy provider should auto-msync to the
+   * Active NameNode, automatically performing an msync() call to the active
+   * to fetch the current transaction ID before submitting read requests to
+   * observer nodes. See HDFS-14211 for more description of this feature.
+   * If this is below 0, never auto-msync. If this is 0, perform an msync on
+   * every read operation. If this is above 0, perform an msync after this many
+   * ms have elapsed since the last msync.
+   */
+  private final long autoMsyncPeriodMs;
+
+  /**
+   * The time, in millisecond epoch, that the last msync operation was
+   * performed. This includes any implicit msync (any operation which is
+   * serviced by the Active NameNode).
+   */
+  private volatile long lastMsyncTimeMs = -1;
+
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> 
xface,
+      HAProxyFactory<T> factory) {
+    this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, 
xface, factory));
+  }
+
+  @SuppressWarnings("unchecked")
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> 
xface,
+      HAProxyFactory<T> factory, AbstractNNFailoverProxyProvider<T> 
failoverProxy) {
+    super(conf, uri, xface, factory);
+    this.alignmentContext = new ClientGSIContext();
+    factory.setAlignmentContext(alignmentContext);
+    this.innerProxy = failoverProxy;
+
+    String proxyInfoString = "RouterObserverReadProxyProvider for " + 
innerProxy.getProxy();
+
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+        RouterObserverReadInvocationHandler.class.getClassLoader(),
+        new Class<?>[]{xface}, new RouterObserverReadInvocationHandler());
+    this.wrapperProxy = new ProxyInfo<>(wrappedProxy, proxyInfoString);
+
+    autoMsyncPeriodMs = conf.getTimeDuration(
+        // The host of the URI is the name service ID
+        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
+    if (wrappedProxy instanceof ClientProtocol) {
+      this.observerReadEnabled = true;
+    } else {
+      LOG.info("Disabling observer reads for {} because the requested proxy "
+          + "class does not implement {}", uri, 
ClientProtocol.class.getName());
+      this.observerReadEnabled = false;
+    }
+  }
+
+
+  public AlignmentContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return wrapperProxy;
+  }
+
+  @Override
+  public void performFailover(T currentProxy) {
+    innerProxy.performFailover(currentProxy);
+  }
+
+  @Override
+  public boolean useLogicalURI() {
+    return innerProxy.useLogicalURI();
+  }
+
+  @Override
+  public void close() throws IOException {
+    innerProxy.close();
+  }
+
+  /**
+   * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
+   * {@link ClassCastException} and wraps it in a more helpful message. This
+   * should ONLY be called if the caller is certain that the proxy is, in fact,
+   * a {@link ClientProtocol}.
+   */
+  private ClientProtocol getProxyAsClientProtocol(T proxy) {
+    assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy of 
class "
+        + proxy.getClass()
+        + " as if it was a ClientProtocol.";
+    return (ClientProtocol) proxy;
+  }
+
+  /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #innerProxy}) to update the state of this client, only
+   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+   * an msync was performed.
+   *
+   * @see #autoMsyncPeriodMs
+   */
+  private void autoMsyncIfNecessary() throws IOException {
+    if (autoMsyncPeriodMs == 0) {
+      // Always msync
+      getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+    } else if (autoMsyncPeriodMs > 0) {
+      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+        synchronized (this) {
+          // Use a synchronized block so that only one thread will msync
+          // if many operations are submitted around the same time.
+          // Re-check the entry criterion since the status may have changed
+          // while waiting for the lock.
+          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+            getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+            lastMsyncTimeMs = Time.monotonicNow();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if a method is read-only.
+   *
+   * @return whether the 'method' is a read-only operation.
+   */
+  private static boolean isRead(Method method) {
+    if (!method.isAnnotationPresent(ReadOnly.class)) {
+      return false;
+    }
+    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
+  }
+
+  private class RouterObserverReadInvocationHandler implements 
RpcInvocationHandler {
+
+    @Override
+    public Client.ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(innerProxy.getProxy().proxy);
+    }
+
+    @Override
+    public void close() throws IOException {
+      innerProxy.close();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+      if (observerReadEnabled && isRead(method)) {
+        autoMsyncIfNecessary();
+      }
+
+      Object retVal;
+      try {
+        retVal = method.invoke(innerProxy.getProxy().proxy, args);
+      } catch (InvocationTargetException e) {
+        // This exception will be handled by higher layers
+        throw e.getCause();
+      }
+
+      lastMsyncTimeMs = Time.monotonicNow();

Review Comment:
   Good catch, we don't need the update outside of `autoMsyncIfNecesary`. 
Having that extra update is a bug. I've added a testcase that would fail if 
that extra update is there.





> RBF: Add supports for observer.auto-msync-period when using routers
> -------------------------------------------------------------------
>
>                 Key: HDFS-17027
>                 URL: https://issues.apache.org/jira/browse/HDFS-17027
>             Project: Hadoop HDFS
>          Issue Type: Bug
>            Reporter: Simbarashe Dzinamarira
>            Assignee: Simbarashe Dzinamarira
>            Priority: Major
>              Labels: pull-request-available
>
> None-RBF clients that use observer reads have the option to set 
> *dfs.client.failover.observer.auto-msync-period.<nameservice>* . This config 
> makes the client automatically do an msync, allowing clients to use the 
> observer reads feature without any code change.
> To use observer reads with RBF, clients set 
> *dfs.client.rbf.observer.read.enable*. The way this flag is implemented does 
> not allow clients to use the *auto-msync-period* config. So with RBF, clients 
> either have to 
> # Not use observer reads
> # Use observer reads with the risk of stale reads
> # Make code changes to explicitly call msync.
> We should add support for 
> *dfs.client.failover.observer.auto-msync-period.<nameservice>*. This can be 
> done by adding a ProxyProvider, in a similar manner to the 
> ObserverReadProxyProvider.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to