jojochuang commented on code in PR #7379:
URL: https://github.com/apache/ozone/pull/7379#discussion_r1823745162


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java:
##########
@@ -39,6 +39,60 @@ public class OzoneClientConfig {
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneClientConfig.class);
 
+  public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
+  public static final String OZONE_DOMAIN_SOCKET_PATH = 
"ozone.domain.socket.path";
+  public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = 
"/var/lib/ozone/dn_socket";
+  public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit.";
+  public static final short DATA_TRANSFER_VERSION = 28;
+  public static final byte DATA_TRANSFER_MAGIC_CODE = 99;
+
+  @Config(key = "read.short-circuit",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      description = "Whether read short-circuit is enabled or not",
+      tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+  private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT;
+
+  @Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size",
+      defaultValue = "128KB",
+      type = ConfigType.SIZE,
+      description = "Buffer size of reader/writer.",
+      tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+  private int shortCircuitBufferSize = 128 * 1024;

Review Comment:
   HDFS has default short circuit buffer size of 1MB.



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java:
##########
@@ -39,6 +39,60 @@ public class OzoneClientConfig {
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneClientConfig.class);
 
+  public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
+  public static final String OZONE_DOMAIN_SOCKET_PATH = 
"ozone.domain.socket.path";
+  public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = 
"/var/lib/ozone/dn_socket";

Review Comment:
   Can we use existing directories like ozone.metadata.dirs or hdds.metadata.dir



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java:
##########
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  A factory to help create DomainSocket.
+ */
+public final class DomainSocketFactory implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DomainSocketFactory.class);
+
+  /**
+   *  Domain socket path state.
+   */
+  public enum PathState {
+    NOT_CONFIGURED(false),
+    DISABLED(false),
+    VALID(true);
+
+    PathState(boolean usableForShortCircuit) {
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+    private final boolean usableForShortCircuit;
+  }
+
+  /**
+   *  Domain socket path.
+   */
+  public static class PathInfo {
+    private static final PathInfo NOT_CONFIGURED = new PathInfo("", 
PathState.NOT_CONFIGURED);
+    private static final PathInfo DISABLED = new PathInfo("", 
PathState.DISABLED);
+    private static final PathInfo VALID = new PathInfo("", PathState.VALID);
+
+    private final String path;
+    private final PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+
+    @Override
+    public String toString() {
+      return "PathInfo{path=" + path + ", state=" + state + "}";
+    }
+  }
+
+  public static final String FEATURE = "short-circuit reads";
+  public static final String FEATURE_FLAG = "SC";
+  private static boolean nativeCodeLoaded = false;
+  private static String nativeLibraryLoadFailureReason;
+  private long pathExpireMills;
+  private final ConcurrentHashMap<String, PathInfo> pathMap;
+  private Timer timer;
+  private boolean isEnabled = false;
+  private String domainSocketPath;
+
+  static {
+    // Try to load native hadoop library and set fallback flag appropriately
+    if (SystemUtils.IS_OS_WINDOWS) {
+      nativeLibraryLoadFailureReason = "UNIX Domain sockets are not available 
on Windows.";
+    } else {
+      LOG.info("Trying to load the custom-built native-hadoop library...");
+      try {
+        System.loadLibrary("hadoop");
+        LOG.info("Loaded the native-hadoop library");
+        nativeCodeLoaded = true;
+      } catch (Throwable t) {
+        // Ignore failure to continue
+        LOG.info("Failed to load native-hadoop with error: " + t);
+        LOG.info("java.library.path=" + 
System.getProperty("java.library.path"));
+        nativeLibraryLoadFailureReason = "libhadoop cannot be loaded.";
+      }
+
+      if (!nativeCodeLoaded) {
+        LOG.warn("Unable to load native-hadoop library for your platform... " +
+            "using builtin-java classes where applicable");
+      }
+    }
+  }
+
+  private static volatile DomainSocketFactory instance = null;
+
+  public static synchronized DomainSocketFactory 
getInstance(ConfigurationSource conf) {
+    if (instance == null) {
+      instance = new DomainSocketFactory(conf);
+    }
+    return instance;
+  }

Review Comment:
   ```suggestion
     public static DomainSocketFactory getInstance(ConfigurationSource conf) {
       if (instance == null) {
         synchronized (DomainSocketFactory.class) {
             instance = new DomainSocketFactory(conf);
         }
       }
       return instance;
     }
   ```
   
   Use double-checked locking instead: 
https://www.baeldung.com/java-singleton-double-checked-locking



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O
+ * on a UNIX domain socket.
+ */
+public class DomainPeer implements Closeable {
+  private final DomainSocket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final ReadableByteChannel channel;
+  public static final Logger LOG = LoggerFactory.getLogger(DomainPeer.class);
+
+  public DomainPeer(DomainSocket socket) {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.channel = socket.getChannel();
+  }
+
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
+  }
+
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+  }
+
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
+  }
+
+  public boolean isClosed() {
+    return !socket.isOpen();
+  }
+
+  public void close() throws IOException {
+    socket.close();
+    LOG.info("{} is closed", socket);
+  }
+
+  public String getRemoteAddressString() {
+    return "unix:{" + socket.toString() + "}";
+  }
+
+  public String getLocalAddressString() {
+    return "<local>";
+  }
+
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeer(" + getRemoteAddressString() + ")";
+  }
+
+  public DomainSocket getDomainSocket() {
+    return socket;
+  }
+
+  public boolean hasSecureChannel() {
+    //
+    // Communication over domain sockets is assumed to be secure, since it
+    // doesn't pass over any network. We also carefully control the privileges
+    // that can be used on the domain socket inode and its parent directories.
+    // See 
#{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
+    // for details.
+    //
+    // So unless you are running as root or the hdfs superuser, you cannot

Review Comment:
   ```suggestion
       // So unless you are running as root or the ozone superuser, you cannot
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to