This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e4ed1964fd6 Pipe: Support "sink.host" and "connector.host" key in 
receiver IP specification (#12017)
e4ed1964fd6 is described below

commit e4ed1964fd61d4ebea47c9e47818c463eb7cc28b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 5 18:03:30 2024 +0800

    Pipe: Support "sink.host" and "connector.host" key in receiver IP 
specification (#12017)
---
 .../java/org/apache/iotdb/pipe/api/PipeSink.java   | 49 ++++++++++++----------
 .../java/org/apache/iotdb/pipe/api/PipeSource.java | 42 ++++++++++---------
 .../config/constant/PipeConnectorConstant.java     |  2 +
 .../builtin/connector/iotdb/IoTDBConnector.java    | 34 +++++++++++----
 4 files changed, 77 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
index d2a5262ec94..498a4395a7c 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
@@ -30,26 +30,29 @@ import 
org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 /**
  * PipeSink
  *
- * <p>PipeSink is responsible for sending events to sinks.
+ * <p>{@link PipeSink} is responsible for sending {@link Event}s to sinks.
  *
- * <p>Various network protocols can be supported by implementing different 
PipeSink classes.
+ * <p>Various network protocols can be supported by implementing different 
{@link PipeSink} classes.
  *
- * <p>The lifecycle of a PipeSink is as follows:
+ * <p>The lifecycle of a {@link PipeSink} is as follows:
  *
  * <ul>
- *   <li>When a collaboration task is created, the KV pairs of `WITH 
CONNECTOR` clause in SQL are
- *       parsed and the validation method {@link 
PipeSink#validate(PipeParameterValidator)} will be
- *       called to validate the parameters.
+ *   <li>When a collaboration task is created, the KV pairs of `WITH SINK` 
clause in SQL are parsed
+ *       and the validation method {@link 
PipeSink#validate(PipeParameterValidator)} will be called
+ *       to validate the parameters.
  *   <li>Before the collaboration task starts, the method {@link 
PipeSink#customize(PipeParameters,
- *       PipeSinkRuntimeConfiguration)} will be called to config the runtime 
behavior of the
+ *       PipeSinkRuntimeConfiguration)} will be called to configure the 
runtime behavior of the
  *       PipeSink and the method {@link PipeSink#handshake()} will be called 
to create a connection
  *       with sink.
  *   <li>While the collaboration task is in progress:
  *       <ul>
- *         <li>PipeExtractor captures the events and wraps them into three 
types of Event instances.
- *         <li>PipeProcessor processes the event and then passes them to the 
PipeSink.
- *         <li>PipeSink serializes the events into binaries and send them to 
sinks. The following 3
- *             methods will be called: {@link 
PipeSink#transfer(TabletInsertionEvent)}, {@link
+ *         <li>{@link PipeSource} captures the {@link Event}s and wraps them 
into three types of
+ *             {@link Event} instances.
+ *         <li>{@link PipeProcessor} processes the {@link Event} and then 
passes them to the {@link
+ *             PipeSink}.
+ *         <li>{@link PipeSink} serializes the {@link Event}s into binaries 
and send them to sinks.
+ *             The following 3 methods will be called: {@link
+ *             PipeSink#transfer(TabletInsertionEvent)}, {@link
  *             PipeSink#transfer(TsFileInsertionEvent)} and {@link 
PipeSink#transfer(Event)}.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is 
executed), the {@link
@@ -68,23 +71,23 @@ public interface PipeSink extends PipeConnector {
    * PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is 
called.
    *
    * @param validator the validator used to validate {@link PipeParameters}
-   * @throws Exception if any parameter is not valid
+   * @throws Exception if any of {@link PipeParameters} is not valid
    */
   void validate(PipeParameterValidator validator) throws Exception;
 
   /**
-   * This method is mainly used to customize PipeSink. In this method, the 
user can do the following
-   * things:
+   * This method is mainly used to customize {@link PipeSink}. In this method, 
the user can do the
+   * following things:
    *
    * <ul>
-   *   <li>Use PipeParameters to parse key-value pair attributes entered by 
the user.
-   *   <li>Set the running configurations in PipeSinkRuntimeConfiguration.
+   *   <li>Use {@link PipeParameters} to parse key-value pair attributes 
entered by the user.
+   *   <li>Set the running configurations in {@link 
PipeSinkRuntimeConfiguration}.
    * </ul>
    *
    * <p>This method is called after the method {@link 
PipeSink#validate(PipeParameterValidator)} is
    * called and before the method {@link PipeSink#handshake()} is called.
    *
-   * @param parameters used to parse the input parameters entered by the user
+   * @param parameters used to parse the input {@link PipeParameters} entered 
by the user
    * @param configuration used to set the required properties of the running 
PipeSink
    * @throws Exception the user can throw errors if necessary
    */
@@ -109,18 +112,18 @@ public interface PipeSink extends PipeConnector {
   void heartbeat() throws Exception;
 
   /**
-   * This method is used to transfer the TabletInsertionEvent.
+   * This method is used to transfer the {@link TabletInsertionEvent}.
    *
-   * @param tabletInsertionEvent TabletInsertionEvent to be transferred
+   * @param tabletInsertionEvent {@link TabletInsertionEvent} to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
   void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
 
   /**
-   * This method is used to transfer the TsFileInsertionEvent.
+   * This method is used to transfer the {@link TsFileInsertionEvent}.
    *
-   * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+   * @param tsFileInsertionEvent {@link TsFileInsertionEvent} to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
@@ -136,9 +139,9 @@ public interface PipeSink extends PipeConnector {
   }
 
   /**
-   * This method is used to transfer the generic events, including 
HeartbeatEvent.
+   * This method is used to transfer the generic {@link Event}s, including 
HeartbeatEvent.
    *
-   * @param event Event to be transferred
+   * @param event {@link Event} to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
index e68724055d6..def0e6e42ff 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
@@ -27,23 +27,23 @@ import org.apache.iotdb.pipe.api.event.Event;
 /**
  * PipeSource
  *
- * <p>PipeSource is responsible for capturing events from sources.
+ * <p>{@link PipeSource} is responsible for capturing {@link Event}s from 
sources.
  *
- * <p>Various data sources can be supported by implementing different 
PipeSource classes.
+ * <p>Various data sources can be supported by implementing different {@link 
PipeSource} classes.
  *
- * <p>The lifecycle of a PipeSource is as follows:
+ * <p>The lifecycle of a {@link PipeSource} is as follows:
  *
  * <ul>
  *   <li>When a collaboration task is created, the KV pairs of `WITH 
EXTRACTOR` clause in SQL are
  *       parsed and the validation method {@link 
PipeSource#validate(PipeParameterValidator)} will
- *       be called to validate the parameters.
+ *       be called to validate the {@link PipeParameters}.
  *   <li>Before the collaboration task starts, the method {@link
  *       PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} 
will be called to
- *       config the runtime behavior of the PipeSource.
- *   <li>Then the method {@link PipeSource#start()} will be called to start 
the PipeSource.
+ *       configure the runtime behavior of the {@link PipeSource}.
+ *   <li>Then the method {@link PipeSource#start()} will be called to start 
the {@link PipeSource}.
  *   <li>While the collaboration task is in progress, the method {@link 
PipeSource#supply()} will be
- *       called to capture events from sources and then the events will be 
passed to the
- *       PipeProcessor.
+ *       called to capture {@link Event}s from sources and then the {@link 
Event}s will be passed to
+ *       the {@link PipeProcessor}.
  *   <li>The method {@link PipeSource#close()} will be called when the 
collaboration task is
  *       cancelled (the `DROP PIPE` command is executed).
  * </ul>
@@ -55,32 +55,32 @@ public interface PipeSource extends PipeExtractor {
    * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is 
called.
    *
    * @param validator the validator used to validate {@link PipeParameters}
-   * @throws Exception if any parameter is not valid
+   * @throws Exception if any {@link PipeParameters} is invalid
    */
   void validate(PipeParameterValidator validator) throws Exception;
 
   /**
-   * This method is mainly used to customize PipeSource. In this method, the 
user can do the
+   * This method is mainly used to customize {@link PipeSource}. In this 
method, the user can do the
    * following things:
    *
    * <ul>
-   *   <li>Use PipeParameters to parse key-value pair attributes entered by 
the user.
-   *   <li>Set the running configurations in PipeSourceRuntimeConfiguration.
+   *   <li>Use {@link PipeParameters} to parse key-value pair attributes 
entered by the user.
+   *   <li>Set the running configurations in {@link 
PipeSourceRuntimeConfiguration}.
    * </ul>
    *
    * <p>This method is called after the method {@link 
PipeSource#validate(PipeParameterValidator)}
    * is called.
    *
-   * @param parameters used to parse the input parameters entered by the user
-   * @param configuration used to set the required properties of the running 
PipeSource
+   * @param parameters used to parse the input {@link PipeParameters} entered 
by the user
+   * @param configuration used to set the required properties of the running 
{@link PipeSource}
    * @throws Exception the user can throw errors if necessary
    */
   void customize(PipeParameters parameters, PipeSourceRuntimeConfiguration 
configuration)
       throws Exception;
 
   /**
-   * Start the extractor. After this method is called, events should be ready 
to be supplied by
-   * {@link PipeSource#supply()}. This method is called after {@link
+   * Start the {@link PipeSource}. After this method is called, {@link Event}s 
should be ready to be
+   * supplied by {@link PipeSource#supply()}. This method is called after 
{@link
    * PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is 
called.
    *
    * @throws Exception the user can throw errors if necessary
@@ -88,11 +88,13 @@ public interface PipeSource extends PipeExtractor {
   void start() throws Exception;
 
   /**
-   * Supply single event from the extractor and the caller will send the event 
to the processor.
-   * This method is called after {@link PipeSource#start()} is called.
+   * Supply single {@link Event} from the {@link PipeSource} and the caller 
will send the {@link
+   * Event} to the {@link PipeProcessor}. This method is called after {@link 
PipeSource#start()} is
+   * called.
    *
-   * @return the event to be supplied. the event may be null if the extractor 
has no more events at
-   *     the moment, but the extractor is still running for more events.
+   * @return the {@link Event} to be supplied. the {@link Event} may be {@code 
null} if the {@link
+   *     PipeSource} has no more {@link Event}s at the moment, but the {@link 
PipeSource} is still
+   *     running for more {@link Event}s.
    * @throws Exception the user can throw errors if necessary
    */
   Event supply() throws Exception;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8426c999a6c..992f60e1e67 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -33,6 +33,8 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
   public static final String SINK_IOTDB_IP_KEY = "sink.ip";
+  public static final String CONNECTOR_IOTDB_HOST_KEY = "connector.host";
+  public static final String SINK_IOTDB_HOST_KEY = "sink.host";
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String SINK_IOTDB_PORT_KEY = "sink.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
index c183339b3be..e64134eb495 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
@@ -39,10 +39,12 @@ import java.util.Set;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
@@ -59,7 +61,7 @@ public abstract class IoTDBConnector implements PipeConnector 
{
       "Exception occurred while parsing node urls from target servers: {}";
 
   private static final String PARSE_URL_ERROR_MESSAGE =
-      "Error occurred while parsing node urls from target servers, please 
check the specified 'ip':'port' or 'node-urls'";
+      "Error occurred while parsing node urls from target servers, please 
check the specified 'host':'port' or 'node-urls'";
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
@@ -67,22 +69,24 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     validator.validate(
         args ->
             (boolean) args[0]
-                || ((boolean) args[1] && (boolean) args[2])
-                || (boolean) args[3]
-                || ((boolean) args[4] && (boolean) args[5]),
+                || (((boolean) args[1] || (boolean) args[2]) && (boolean) 
args[3])
+                || (boolean) args[4]
+                || (((boolean) args[5] || (boolean) args[6]) && (boolean) 
args[7]),
         String.format(
             "One of %s, %s:%s, %s, %s:%s must be specified",
             CONNECTOR_IOTDB_NODE_URLS_KEY,
-            CONNECTOR_IOTDB_IP_KEY,
+            CONNECTOR_IOTDB_HOST_KEY,
             CONNECTOR_IOTDB_PORT_KEY,
             SINK_IOTDB_NODE_URLS_KEY,
-            SINK_IOTDB_IP_KEY,
+            SINK_IOTDB_HOST_KEY,
             SINK_IOTDB_PORT_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+        parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
         parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
         parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
         parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+        parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
         parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
   }
 
@@ -121,6 +125,22 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
       }
 
+      if (parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY)
+          && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
+        givenNodeUrls.add(
+            new TEndPoint(
+                parameters.getStringByKeys(CONNECTOR_IOTDB_HOST_KEY),
+                parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+      }
+
+      if (parameters.hasAttribute(SINK_IOTDB_HOST_KEY)
+          && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+        givenNodeUrls.add(
+            new TEndPoint(
+                parameters.getStringByKeys(SINK_IOTDB_HOST_KEY),
+                parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
+      }
+
       if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
         givenNodeUrls.addAll(
             NodeUrlUtils.parseTEndPointUrls(
@@ -145,7 +165,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
   private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws 
PipeParameterNotValidException {
     for (TEndPoint nodeUrl : nodeUrls) {
       if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
-        LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "ip cannot be empty");
+        LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
         throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
       }
       if (nodeUrl.port == 0) {

Reply via email to