This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 54b7e6d4cd9 [IOTDB-6220] Pipe: Changed the sink loopback detection 
logic to support hostName and IPv6 specification. (#11455)
54b7e6d4cd9 is described below

commit 54b7e6d4cd919081b9e08c173bffcb2e6ce2745f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 2 20:47:07 2023 +0800

    [IOTDB-6220] Pipe: Changed the sink loopback detection logic to support 
hostName and IPv6 specification. (#11455)
---
 .../confignode/conf/ConfigNodeDescriptor.java      | 17 ++++--
 .../protocol/airgap/IoTDBAirGapConnector.java      | 33 +++++++----
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 23 ++++++--
 .../thrift/sync/IoTDBThriftSyncConnector.java      | 23 ++++++--
 .../apache/iotdb/commons/utils/NodeUrlUtils.java   | 65 +++++++++++++++++++++-
 5 files changed, 132 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 99cc0e6376d..a1fbb95cd6d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -41,6 +41,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.Properties;
 
 public class ConfigNodeDescriptor {
@@ -890,10 +892,17 @@ public class ConfigNodeDescriptor {
    * @return True if the seed_config_node points to itself
    */
   public boolean isSeedConfigNode() {
-    return (conf.getInternalAddress().equals(conf.getSeedConfigNode().getIp())
-            || (NodeUrlUtils.isLocalAddress(conf.getInternalAddress())
-                && 
NodeUrlUtils.isLocalAddress(conf.getSeedConfigNode().getIp())))
-        && conf.getInternalPort() == conf.getSeedConfigNode().getPort();
+    try {
+      return 
(conf.getInternalAddress().equals(conf.getSeedConfigNode().getIp())
+              || (NodeUrlUtils.containsLocalAddress(
+                      Collections.singletonList(conf.getInternalAddress()))
+                  && NodeUrlUtils.containsLocalAddress(
+                      
Collections.singletonList(conf.getSeedConfigNode().getIp()))))
+          && conf.getInternalPort() == conf.getSeedConfigNode().getPort();
+    } catch (UnknownHostException e) {
+      LOGGER.warn("Unknown host when checking seed configNode IP {}", 
conf.getInternalAddress(), e);
+      return false;
+    }
   }
 
   public static ConfigNodeDescriptor getInstance() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 427db8030c7..854355ad916 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
@@ -58,10 +59,12 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 
 import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
@@ -94,17 +97,27 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
     Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters());
 
     validator.validate(
-        empty ->
-            !(pipeConfig.getPipeAirGapReceiverEnabled()
-                    && (givenNodeUrls.contains(
-                        new TEndPoint(
-                            ioTDBConfig.getRpcAddress(), 
pipeConfig.getPipeAirGapReceiverPort())))
-                || givenNodeUrls.contains(
-                    new TEndPoint("127.0.0.1", 
pipeConfig.getPipeAirGapReceiverPort()))
-                || givenNodeUrls.contains(
-                    new TEndPoint("0.0.0.0", 
pipeConfig.getPipeAirGapReceiverPort()))),
+        empty -> {
+          try {
+            // Ensure the sink doesn't point to the air gap receiver on 
DataNode itself
+
+            if (!pipeConfig.getPipeAirGapReceiverEnabled()) {
+              return true;
+            }
+
+            return !NodeUrlUtils.containsLocalAddress(
+                givenNodeUrls.stream()
+                    .filter(
+                        tEndPoint -> tEndPoint.getPort() == 
pipeConfig.getPipeAirGapReceiverPort())
+                    .map(TEndPoint::getIp)
+                    .collect(Collectors.toList()));
+          } catch (UnknownHostException e) {
+            LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+            return false;
+          }
+        },
         String.format(
-            "One of the endpoints %s of the receivers is pointing back to the 
air gap receiver %s on sender itself",
+            "One of the endpoints %s of the receivers is pointing back to the 
air gap receiver %s on sender itself, or unknown host when checking pipe sink 
IP.",
             givenNodeUrls,
             new TEndPoint(ioTDBConfig.getRpcAddress(), 
pipeConfig.getPipeAirGapReceiverPort())));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index fabf87f4bdf..c2df8ceee08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
@@ -60,10 +61,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -122,13 +125,21 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
             parameters.hasAttribute(SINK_IOTDB_IP_KEY),
             parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
         .validate(
-            empty ->
-                !(givenNodeUrls.contains(
-                        new TEndPoint(ioTDBConfig.getRpcAddress(), 
ioTDBConfig.getRpcPort()))
-                    || givenNodeUrls.contains(new TEndPoint("127.0.0.1", 
ioTDBConfig.getRpcPort()))
-                    || givenNodeUrls.contains(new TEndPoint("0.0.0.0", 
ioTDBConfig.getRpcPort()))),
+            empty -> {
+              try {
+                // Ensure the sink doesn't point to the legacy receiver on 
DataNode itself
+                return !NodeUrlUtils.containsLocalAddress(
+                    givenNodeUrls.stream()
+                        .filter(tEndPoint -> tEndPoint.getPort() == 
ioTDBConfig.getRpcPort())
+                        .map(TEndPoint::getIp)
+                        .collect(Collectors.toList()));
+              } catch (UnknownHostException e) {
+                LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+                return false;
+              }
+            },
             String.format(
-                "One of the endpoints %s of the receivers is pointing back to 
the legacy receiver on sender %s itself",
+                "One of the endpoints %s of the receivers is pointing back to 
the legacy receiver %s on sender itself, or unknown host when checking pipe 
sink IP.",
                 givenNodeUrls,
                 new TEndPoint(ioTDBConfig.getRpcAddress(), 
ioTDBConfig.getRpcPort())));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index ef84812be7a..15041bdf09d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
@@ -59,10 +60,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class IoTDBThriftSyncConnector extends IoTDBConnector {
 
@@ -88,13 +91,21 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
     Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters());
 
     validator.validate(
-        empty ->
-            !(givenNodeUrls.contains(
-                    new TEndPoint(ioTDBConfig.getRpcAddress(), 
ioTDBConfig.getRpcPort()))
-                || givenNodeUrls.contains(new TEndPoint("127.0.0.1", 
ioTDBConfig.getRpcPort()))
-                || givenNodeUrls.contains(new TEndPoint("0.0.0.0", 
ioTDBConfig.getRpcPort()))),
+        empty -> {
+          try {
+            // Ensure the sink doesn't point to the thrift receiver on 
DataNode itself
+            return !NodeUrlUtils.containsLocalAddress(
+                givenNodeUrls.stream()
+                    .filter(tEndPoint -> tEndPoint.getPort() == 
ioTDBConfig.getRpcPort())
+                    .map(TEndPoint::getIp)
+                    .collect(Collectors.toList()));
+          } catch (UnknownHostException e) {
+            LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+            return false;
+          }
+        },
         String.format(
-            "One of the endpoints %s of the receivers is pointing back to the 
thrift receiver %s on sender itself",
+            "One of the endpoints %s of the receivers is pointing back to the 
thrift receiver %s on sender itself, or unknown host when checking pipe sink 
IP.",
             givenNodeUrls, new TEndPoint(ioTDBConfig.getRpcAddress(), 
ioTDBConfig.getRpcPort())));
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
index f54290d0d77..8cf64a137a4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
@@ -26,15 +26,22 @@ import 
org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 public class NodeUrlUtils {
 
   private static final Logger logger = 
LoggerFactory.getLogger(NodeUrlUtils.class);
 
+  public static final String WILD_CARD_ADDRESS = "0.0.0.0";
+  public static final String LOOPBACK_HOST_NAME = "localhost";
   /**
    * Convert TEndPoint to TEndPointUrl
    *
@@ -188,10 +195,62 @@ public class NodeUrlUtils {
     return parseTConfigNodeUrls(Arrays.asList(configNodeUrls.split(";")));
   }
 
-  public static boolean isLocalAddress(String ip) {
-    if (ip == null) {
+  /**
+   * Detect whether the given addresses or host names(may contain both) point 
to the node itself.
+   *
+   * @param addressesOrHostNames List of the addresses or host name to check
+   * @return true if one of the given strings point to the node itself
+   * @throws UnknownHostException Throw when unable to parse the given 
addresses or host names
+   */
+  public static boolean containsLocalAddress(List<String> addressesOrHostNames)
+      throws UnknownHostException {
+    if (addressesOrHostNames == null) {
       return false;
     }
-    return ip.equals("0.0.0.0") || ip.equals("127.0.0.1") || 
ip.equals("localhost");
+
+    Set<String> selfAddresses = getAllLocalAddresses();
+
+    for (String addressOrHostName : addressesOrHostNames) {
+      if (addressOrHostName == null) {
+        continue;
+      }
+      // Unify address or hostName, converting them to addresses
+      Set<String> translatedAddresses =
+          Arrays.stream(InetAddress.getAllByName(addressOrHostName))
+              .map(InetAddress::getHostAddress)
+              .collect(Collectors.toCollection(HashSet::new));
+      translatedAddresses.retainAll(selfAddresses);
+
+      if (!translatedAddresses.isEmpty()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Return all the internal, external, IPv4, IPv6 and loopback 
addresses(without hostname) of the
+   * node
+   *
+   * @return the local addresses set
+   * @throws UnknownHostException Throw when unable to self addresses or host 
names (Normally not
+   *     thrown)
+   */
+  public static Set<String> getAllLocalAddresses() throws UnknownHostException 
{
+    // Check internal and external, IPv4 and IPv6 network addresses
+    Set<String> selfAddresses =
+        
Arrays.stream(InetAddress.getAllByName(InetAddress.getLocalHost().getHostName()))
+            .map(InetAddress::getHostAddress)
+            .collect(Collectors.toCollection(HashSet::new));
+    // Check IPv4 and IPv6 loopback addresses 127.0.0.1 and 0.0.0.0.0.0.0.1
+    selfAddresses.addAll(
+        Arrays.stream(InetAddress.getAllByName(LOOPBACK_HOST_NAME))
+            .map(InetAddress::getHostAddress)
+            .collect(Collectors.toCollection(HashSet::new)));
+    // Check general address 0.0.0.0
+    selfAddresses.add(WILD_CARD_ADDRESS);
+
+    return selfAddresses;
   }
 }

Reply via email to