This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 54b7e6d4cd9 [IOTDB-6220] Pipe: Changed the sink loopback detection
logic to support hostName and IPv6 specification. (#11455)
54b7e6d4cd9 is described below
commit 54b7e6d4cd919081b9e08c173bffcb2e6ce2745f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 2 20:47:07 2023 +0800
[IOTDB-6220] Pipe: Changed the sink loopback detection logic to support
hostName and IPv6 specification. (#11455)
---
.../confignode/conf/ConfigNodeDescriptor.java | 17 ++++--
.../protocol/airgap/IoTDBAirGapConnector.java | 33 +++++++----
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 23 ++++++--
.../thrift/sync/IoTDBThriftSyncConnector.java | 23 ++++++--
.../apache/iotdb/commons/utils/NodeUrlUtils.java | 65 +++++++++++++++++++++-
5 files changed, 132 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 99cc0e6376d..a1fbb95cd6d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -41,6 +41,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.Properties;
public class ConfigNodeDescriptor {
@@ -890,10 +892,17 @@ public class ConfigNodeDescriptor {
* @return True if the seed_config_node points to itself
*/
public boolean isSeedConfigNode() {
- return (conf.getInternalAddress().equals(conf.getSeedConfigNode().getIp())
- || (NodeUrlUtils.isLocalAddress(conf.getInternalAddress())
- &&
NodeUrlUtils.isLocalAddress(conf.getSeedConfigNode().getIp())))
- && conf.getInternalPort() == conf.getSeedConfigNode().getPort();
+ try {
+ return
(conf.getInternalAddress().equals(conf.getSeedConfigNode().getIp())
+ || (NodeUrlUtils.containsLocalAddress(
+ Collections.singletonList(conf.getInternalAddress()))
+ && NodeUrlUtils.containsLocalAddress(
+
Collections.singletonList(conf.getSeedConfigNode().getIp()))))
+ && conf.getInternalPort() == conf.getSeedConfigNode().getPort();
+ } catch (UnknownHostException e) {
+ LOGGER.warn("Unknown host when checking seed configNode IP {}",
conf.getInternalAddress(), e);
+ return false;
+ }
}
public static ConfigNodeDescriptor getInstance() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 427db8030c7..854355ad916 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
@@ -58,10 +59,12 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.zip.CRC32;
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
@@ -94,17 +97,27 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters());
validator.validate(
- empty ->
- !(pipeConfig.getPipeAirGapReceiverEnabled()
- && (givenNodeUrls.contains(
- new TEndPoint(
- ioTDBConfig.getRpcAddress(),
pipeConfig.getPipeAirGapReceiverPort())))
- || givenNodeUrls.contains(
- new TEndPoint("127.0.0.1",
pipeConfig.getPipeAirGapReceiverPort()))
- || givenNodeUrls.contains(
- new TEndPoint("0.0.0.0",
pipeConfig.getPipeAirGapReceiverPort()))),
+ empty -> {
+ try {
+ // Ensure the sink doesn't point to the air gap receiver on
DataNode itself
+
+ if (!pipeConfig.getPipeAirGapReceiverEnabled()) {
+ return true;
+ }
+
+ return !NodeUrlUtils.containsLocalAddress(
+ givenNodeUrls.stream()
+ .filter(
+ tEndPoint -> tEndPoint.getPort() ==
pipeConfig.getPipeAirGapReceiverPort())
+ .map(TEndPoint::getIp)
+ .collect(Collectors.toList()));
+ } catch (UnknownHostException e) {
+ LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+ return false;
+ }
+ },
String.format(
- "One of the endpoints %s of the receivers is pointing back to the
air gap receiver %s on sender itself",
+ "One of the endpoints %s of the receivers is pointing back to the
air gap receiver %s on sender itself, or unknown host when checking pipe sink
IP.",
givenNodeUrls,
new TEndPoint(ioTDBConfig.getRpcAddress(),
pipeConfig.getPipeAirGapReceiverPort())));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index fabf87f4bdf..c2df8ceee08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
@@ -60,10 +61,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -122,13 +125,21 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
parameters.hasAttribute(SINK_IOTDB_IP_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
.validate(
- empty ->
- !(givenNodeUrls.contains(
- new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort()))
- || givenNodeUrls.contains(new TEndPoint("127.0.0.1",
ioTDBConfig.getRpcPort()))
- || givenNodeUrls.contains(new TEndPoint("0.0.0.0",
ioTDBConfig.getRpcPort()))),
+ empty -> {
+ try {
+ // Ensure the sink doesn't point to the legacy receiver on
DataNode itself
+ return !NodeUrlUtils.containsLocalAddress(
+ givenNodeUrls.stream()
+ .filter(tEndPoint -> tEndPoint.getPort() ==
ioTDBConfig.getRpcPort())
+ .map(TEndPoint::getIp)
+ .collect(Collectors.toList()));
+ } catch (UnknownHostException e) {
+ LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+ return false;
+ }
+ },
String.format(
- "One of the endpoints %s of the receivers is pointing back to
the legacy receiver on sender %s itself",
+ "One of the endpoints %s of the receivers is pointing back to
the legacy receiver %s on sender itself, or unknown host when checking pipe
sink IP.",
givenNodeUrls,
new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort())));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index ef84812be7a..15041bdf09d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
@@ -59,10 +60,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
public class IoTDBThriftSyncConnector extends IoTDBConnector {
@@ -88,13 +91,21 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters());
validator.validate(
- empty ->
- !(givenNodeUrls.contains(
- new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort()))
- || givenNodeUrls.contains(new TEndPoint("127.0.0.1",
ioTDBConfig.getRpcPort()))
- || givenNodeUrls.contains(new TEndPoint("0.0.0.0",
ioTDBConfig.getRpcPort()))),
+ empty -> {
+ try {
+ // Ensure the sink doesn't point to the thrift receiver on
DataNode itself
+ return !NodeUrlUtils.containsLocalAddress(
+ givenNodeUrls.stream()
+ .filter(tEndPoint -> tEndPoint.getPort() ==
ioTDBConfig.getRpcPort())
+ .map(TEndPoint::getIp)
+ .collect(Collectors.toList()));
+ } catch (UnknownHostException e) {
+ LOGGER.warn("Unknown host when checking pipe sink IP.", e);
+ return false;
+ }
+ },
String.format(
- "One of the endpoints %s of the receivers is pointing back to the
thrift receiver %s on sender itself",
+ "One of the endpoints %s of the receivers is pointing back to the
thrift receiver %s on sender itself, or unknown host when checking pipe sink
IP.",
givenNodeUrls, new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort())));
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
index f54290d0d77..8cf64a137a4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
@@ -26,15 +26,22 @@ import
org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
public class NodeUrlUtils {
private static final Logger logger =
LoggerFactory.getLogger(NodeUrlUtils.class);
+ public static final String WILD_CARD_ADDRESS = "0.0.0.0";
+ public static final String LOOPBACK_HOST_NAME = "localhost";
/**
* Convert TEndPoint to TEndPointUrl
*
@@ -188,10 +195,62 @@ public class NodeUrlUtils {
return parseTConfigNodeUrls(Arrays.asList(configNodeUrls.split(";")));
}
- public static boolean isLocalAddress(String ip) {
- if (ip == null) {
+ /**
+ * Detect whether the given addresses or host names(may contain both) point
to the node itself.
+ *
+ * @param addressesOrHostNames List of the addresses or host name to check
+ * @return true if one of the given strings point to the node itself
+ * @throws UnknownHostException Throw when unable to parse the given
addresses or host names
+ */
+ public static boolean containsLocalAddress(List<String> addressesOrHostNames)
+ throws UnknownHostException {
+ if (addressesOrHostNames == null) {
return false;
}
- return ip.equals("0.0.0.0") || ip.equals("127.0.0.1") ||
ip.equals("localhost");
+
+ Set<String> selfAddresses = getAllLocalAddresses();
+
+ for (String addressOrHostName : addressesOrHostNames) {
+ if (addressOrHostName == null) {
+ continue;
+ }
+ // Unify address or hostName, converting them to addresses
+ Set<String> translatedAddresses =
+ Arrays.stream(InetAddress.getAllByName(addressOrHostName))
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toCollection(HashSet::new));
+ translatedAddresses.retainAll(selfAddresses);
+
+ if (!translatedAddresses.isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Return all the internal, external, IPv4, IPv6 and loopback
addresses(without hostname) of the
+ * node
+ *
+ * @return the local addresses set
+ * @throws UnknownHostException Throw when unable to self addresses or host
names (Normally not
+ * thrown)
+ */
+ public static Set<String> getAllLocalAddresses() throws UnknownHostException
{
+ // Check internal and external, IPv4 and IPv6 network addresses
+ Set<String> selfAddresses =
+
Arrays.stream(InetAddress.getAllByName(InetAddress.getLocalHost().getHostName()))
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toCollection(HashSet::new));
+ // Check IPv4 and IPv6 loopback addresses 127.0.0.1 and 0.0.0.0.0.0.0.1
+ selfAddresses.addAll(
+ Arrays.stream(InetAddress.getAllByName(LOOPBACK_HOST_NAME))
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toCollection(HashSet::new)));
+ // Check general address 0.0.0.0
+ selfAddresses.add(WILD_CARD_ADDRESS);
+
+ return selfAddresses;
}
}