This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-10685
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-10685 by this push:
new fdef943fcc HDDS-11622. Support domain socket creation (#7397)
fdef943fcc is described below
commit fdef943fcc14e4b4a954cf7af707db70a4275f72
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Nov 6 12:50:00 2024 +0800
HDDS-11622. Support domain socket creation (#7397)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 54 ++++
.../apache/hadoop/hdds/scm/storage/DomainPeer.java | 111 +++++++++
.../hdds/scm/storage/DomainSocketFactory.java | 272 +++++++++++++++++++++
3 files changed, 437 insertions(+)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 5426bbc498..3497f3359f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -39,6 +39,60 @@ public class OzoneClientConfig {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientConfig.class);
+ public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
+ public static final String OZONE_DOMAIN_SOCKET_PATH =
"ozone.domain.socket.path";
+ public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT =
"/var/lib/ozone/dn_socket";
+ public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit.";
+ public static final short DATA_TRANSFER_VERSION = 28;
+ public static final byte DATA_TRANSFER_MAGIC_CODE = 99;
+
+ @Config(key = "read.short-circuit",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ description = "Whether read short-circuit is enabled or not",
+ tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+ private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT;
+
+ @Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size",
+ defaultValue = "128KB",
+ type = ConfigType.SIZE,
+ description = "Buffer size of reader/writer.",
+ tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+ private int shortCircuitBufferSize = 128 * 1024;
+
+ @Config(key = SHORT_CIRCUIT_PREFIX + "disable.interval",
+ defaultValue = "600",
+ type = ConfigType.LONG,
+ description = "If some unknown IO error happens on Domain socket read,
short circuit read will be disabled " +
+ "temporarily for this period of time(seconds).",
+ tags = { ConfigTag.CLIENT })
+ private long shortCircuitReadDisableInterval = 60 * 10;
+
+ public boolean isShortCircuitEnabled() {
+ return shortCircuitEnabled;
+ }
+
+ public void setShortCircuit(boolean enabled) {
+ shortCircuitEnabled = enabled;
+ }
+
+
+ public int getShortCircuitBufferSize() {
+ return shortCircuitBufferSize;
+ }
+
+ public void setShortCircuitBufferSize(int size) {
+ this.shortCircuitBufferSize = size;
+ }
+
+ public long getShortCircuitReadDisableInterval() {
+ return shortCircuitReadDisableInterval;
+ }
+
+ public void setShortCircuitReadDisableInterval(long value) {
+ shortCircuitReadDisableInterval = value;
+ }
+
/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
new file mode 100644
index 0000000000..3fcebb7f0b
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O
+ * on a UNIX domain socket.
+ */
+public class DomainPeer implements Closeable {
+ private final DomainSocket socket;
+ private final OutputStream out;
+ private final InputStream in;
+ private final ReadableByteChannel channel;
+ public static final Logger LOG = LoggerFactory.getLogger(DomainPeer.class);
+
+ public DomainPeer(DomainSocket socket) {
+ this.socket = socket;
+ this.out = socket.getOutputStream();
+ this.in = socket.getInputStream();
+ this.channel = socket.getChannel();
+ }
+
+ public ReadableByteChannel getInputStreamChannel() {
+ return channel;
+ }
+
+ public void setReadTimeout(int timeoutMs) throws IOException {
+ socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
+ }
+
+ public int getReceiveBufferSize() throws IOException {
+ return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+ }
+
+ public void setWriteTimeout(int timeoutMs) throws IOException {
+ socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
+ }
+
+ public boolean isClosed() {
+ return !socket.isOpen();
+ }
+
+ public void close() throws IOException {
+ socket.close();
+ LOG.info("{} is closed", socket);
+ }
+
+ public String getRemoteAddressString() {
+ return "unix:{" + socket.toString() + "}";
+ }
+
+ public String getLocalAddressString() {
+ return "<local>";
+ }
+
+ public InputStream getInputStream() throws IOException {
+ return in;
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public String toString() {
+ return "DomainPeer(" + getRemoteAddressString() + ")";
+ }
+
+ public DomainSocket getDomainSocket() {
+ return socket;
+ }
+
+ public boolean hasSecureChannel() {
+ //
+ // Communication over domain sockets is assumed to be secure, since it
+ // doesn't pass over any network. We also carefully control the privileges
+ // that can be used on the domain socket inode and its parent directories.
+ // See
#{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
+ // for details.
+ //
+ // So unless you are running as root or the user launches the service, you
cannot
+ // launch a man-in-the-middle attach on UNIX domain socket traffic.
+ //
+ return true;
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
new file mode 100644
index 0000000000..e62e2a6bfd
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A factory to help create DomainSocket.
+ */
+public final class DomainSocketFactory implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DomainSocketFactory.class);
+
+ /**
+ * Domain socket path state.
+ */
+ public enum PathState {
+ NOT_CONFIGURED(false),
+ DISABLED(false),
+ VALID(true);
+
+ PathState(boolean usableForShortCircuit) {
+ this.usableForShortCircuit = usableForShortCircuit;
+ }
+
+ public boolean getUsableForShortCircuit() {
+ return usableForShortCircuit;
+ }
+ private final boolean usableForShortCircuit;
+ }
+
+ /**
+ * Domain socket path.
+ */
+ public static class PathInfo {
+ private static final PathInfo NOT_CONFIGURED = new PathInfo("",
PathState.NOT_CONFIGURED);
+ private static final PathInfo DISABLED = new PathInfo("",
PathState.DISABLED);
+ private static final PathInfo VALID = new PathInfo("", PathState.VALID);
+
+ private final String path;
+ private final PathState state;
+
+ PathInfo(String path, PathState state) {
+ this.path = path;
+ this.state = state;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public PathState getPathState() {
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ return "PathInfo{path=" + path + ", state=" + state + "}";
+ }
+ }
+
+ public static final String FEATURE = "short-circuit reads";
+ public static final String FEATURE_FLAG = "SC";
+ private static boolean nativeLibraryLoaded = false;
+ private static String nativeLibraryLoadFailureReason;
+ private long pathExpireMills;
+ private final ConcurrentHashMap<String, PathInfo> pathMap;
+ private Timer timer;
+ private boolean isEnabled = false;
+ private String domainSocketPath;
+
+ static {
+ // Try to load native hadoop library and set fallback flag appropriately
+ if (SystemUtils.IS_OS_WINDOWS) {
+ nativeLibraryLoadFailureReason = "UNIX Domain sockets are not available
on Windows.";
+ } else {
+ LOG.info("Trying to load the custom-built native-hadoop library...");
+ try {
+ System.loadLibrary("hadoop");
+ LOG.info("Loaded the native-hadoop library");
+ nativeLibraryLoaded = true;
+ } catch (Throwable t) {
+ // Ignore failure to continue
+ LOG.info("Failed to load native-hadoop with error: " + t);
+ LOG.info("java.library.path=" +
System.getProperty("java.library.path"));
+ nativeLibraryLoadFailureReason = "libhadoop cannot be loaded.";
+ }
+
+ if (!nativeLibraryLoaded) {
+ LOG.warn("Unable to load native-hadoop library for your platform... " +
+ "using builtin-java classes where applicable");
+ }
+ }
+ }
+
+ private static volatile DomainSocketFactory instance = null;
+
+ public static DomainSocketFactory getInstance(ConfigurationSource conf) {
+ if (instance == null) {
+ synchronized (DomainSocketFactory.class) {
+ if (instance == null) {
+ instance = new DomainSocketFactory(conf);
+ }
+ }
+ }
+ return instance;
+ }
+
+ private DomainSocketFactory(ConfigurationSource conf) {
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ boolean shortCircuitEnabled = clientConfig.isShortCircuitEnabled();
+ PathInfo pathInfo;
+ long startTime = System.nanoTime();
+ if (!shortCircuitEnabled) {
+ LOG.info(FEATURE + " is disabled.");
+ pathInfo = PathInfo.NOT_CONFIGURED;
+ } else {
+ domainSocketPath = conf.get(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
+ OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH_DEFAULT);
+ if (domainSocketPath.isEmpty()) {
+ throw new IllegalArgumentException(FEATURE + " is enabled but "
+ + OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH + " is not set.");
+ } else if (!nativeLibraryLoaded) {
+ LOG.warn(FEATURE + " cannot be used because " +
nativeLibraryLoadFailureReason);
+ pathInfo = PathInfo.DISABLED;
+ } else {
+ pathInfo = PathInfo.VALID;
+ isEnabled = true;
+ timer = new Timer(DomainSocketFactory.class.getSimpleName() +
"-Timer");
+ LOG.info(FEATURE + " is enabled within {} ns.", System.nanoTime() -
startTime);
+ }
+ }
+ pathExpireMills = clientConfig.getShortCircuitReadDisableInterval() * 1000;
+ pathMap = new ConcurrentHashMap<>();
+ pathMap.put(domainSocketPath, pathInfo);
+ }
+
+ public boolean isServiceEnabled() {
+ return isEnabled;
+ }
+
+ public boolean isServiceReady() {
+ if (isEnabled) {
+ PathInfo status = pathMap.get(domainSocketPath);
+ return status.getPathState() == PathState.VALID;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Get information about a domain socket path. Caller must make sure that
addr is a local address.
+ *
+ * @param addr The local inet address to use.
+ * @return Information about the socket path.
+ */
+ public PathInfo getPathInfo(InetSocketAddress addr) {
+ if (!isEnabled) {
+ return PathInfo.NOT_CONFIGURED;
+ }
+
+ if (!isServiceReady()) {
+ return PathInfo.DISABLED;
+ }
+
+ String escapedPath = DomainSocket.getEffectivePath(domainSocketPath,
addr.getPort());
+ PathInfo status = pathMap.get(escapedPath);
+ if (status == null) {
+ PathInfo pathInfo = new PathInfo(escapedPath, PathState.VALID);
+ pathMap.putIfAbsent(escapedPath, pathInfo);
+ return pathInfo;
+ } else {
+ return status;
+ }
+ }
+
+ /**
+ * Create DomainSocket for addr. Caller must make sure that addr is a local
address.
+ */
+ public DomainSocket createSocket(int readTimeoutMs, int writeTimeoutMs,
InetSocketAddress addr) throws IOException {
+ if (!isEnabled || !isServiceReady()) {
+ return null;
+ }
+ boolean success = false;
+ DomainSocket sock = null;
+ String escapedPath = null;
+ long startTime = System.nanoTime();
+ try {
+ escapedPath = DomainSocket.getEffectivePath(domainSocketPath,
addr.getPort());
+ sock = DomainSocket.connect(escapedPath);
+ sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, readTimeoutMs);
+ sock.setAttribute(DomainSocket.SEND_TIMEOUT, writeTimeoutMs);
+ success = true;
+ LOG.info("{} is created within {} ns", sock, System.nanoTime() -
startTime);
+ } catch (IOException e) {
+ LOG.error("Failed to create DomainSocket", e);
+ throw e;
+ } finally {
+ if (!success) {
+ if (sock != null) {
+ IOUtils.closeQuietly(sock);
+ }
+ if (escapedPath != null) {
+ pathMap.put(escapedPath, PathInfo.DISABLED);
+ LOG.error("{} is disabled for {} ms due to current failure",
escapedPath, pathExpireMills);
+ schedulePathEnable(escapedPath, pathExpireMills);
+ }
+ sock = null;
+ }
+ }
+ return sock;
+ }
+
+ public void disableShortCircuit() {
+ pathMap.put(domainSocketPath, PathInfo.DISABLED);
+ schedulePathEnable(domainSocketPath, pathExpireMills);
+ }
+
+ private void schedulePathEnable(String path, long delayMills) {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ pathMap.put(path, PathInfo.VALID);
+ }
+ }, delayMills);
+ }
+
+ @VisibleForTesting
+ public void clearPathMap() {
+ pathMap.clear();
+ }
+
+ public long getPathExpireMills() {
+ return pathExpireMills;
+ }
+
+ @Override
+ public void close() {
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]