This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e4ed1964fd6 Pipe: Support "sink.host" and "connector.host" key in
receiver IP specification (#12017)
e4ed1964fd6 is described below
commit e4ed1964fd61d4ebea47c9e47818c463eb7cc28b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 5 18:03:30 2024 +0800
Pipe: Support "sink.host" and "connector.host" key in receiver IP
specification (#12017)
---
.../java/org/apache/iotdb/pipe/api/PipeSink.java | 49 ++++++++++++----------
.../java/org/apache/iotdb/pipe/api/PipeSource.java | 42 ++++++++++---------
.../config/constant/PipeConnectorConstant.java | 2 +
.../builtin/connector/iotdb/IoTDBConnector.java | 34 +++++++++++----
4 files changed, 77 insertions(+), 50 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
index d2a5262ec94..498a4395a7c 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSink.java
@@ -30,26 +30,29 @@ import
org.apache.iotdb.pipe.api.exception.PipeConnectionException;
/**
* PipeSink
*
- * <p>PipeSink is responsible for sending events to sinks.
+ * <p>{@link PipeSink} is responsible for sending {@link Event}s to sinks.
*
- * <p>Various network protocols can be supported by implementing different
PipeSink classes.
+ * <p>Various network protocols can be supported by implementing different
{@link PipeSink} classes.
*
- * <p>The lifecycle of a PipeSink is as follows:
+ * <p>The lifecycle of a {@link PipeSink} is as follows:
*
* <ul>
- * <li>When a collaboration task is created, the KV pairs of `WITH
CONNECTOR` clause in SQL are
- * parsed and the validation method {@link
PipeSink#validate(PipeParameterValidator)} will be
- * called to validate the parameters.
+ * <li>When a collaboration task is created, the KV pairs of `WITH SINK`
clause in SQL are parsed
+ * and the validation method {@link
PipeSink#validate(PipeParameterValidator)} will be called
+ * to validate the parameters.
* <li>Before the collaboration task starts, the method {@link
PipeSink#customize(PipeParameters,
- * PipeSinkRuntimeConfiguration)} will be called to config the runtime
behavior of the
+ * PipeSinkRuntimeConfiguration)} will be called to configure the
runtime behavior of the
* PipeSink and the method {@link PipeSink#handshake()} will be called
to create a connection
* with sink.
* <li>While the collaboration task is in progress:
* <ul>
- * <li>PipeExtractor captures the events and wraps them into three
types of Event instances.
- * <li>PipeProcessor processes the event and then passes them to the
PipeSink.
- * <li>PipeSink serializes the events into binaries and send them to
sinks. The following 3
- * methods will be called: {@link
PipeSink#transfer(TabletInsertionEvent)}, {@link
+ * <li>{@link PipeSource} captures the {@link Event}s and wraps them
into three types of
+ * {@link Event} instances.
+ * <li>{@link PipeProcessor} processes the {@link Event} and then
passes them to the {@link
+ * PipeSink}.
+ * <li>{@link PipeSink} serializes the {@link Event}s into binaries
and send them to sinks.
+ * The following 3 methods will be called: {@link
+ * PipeSink#transfer(TabletInsertionEvent)}, {@link
* PipeSink#transfer(TsFileInsertionEvent)} and {@link
PipeSink#transfer(Event)}.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is
executed), the {@link
@@ -68,23 +71,23 @@ public interface PipeSink extends PipeConnector {
* PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is
called.
*
* @param validator the validator used to validate {@link PipeParameters}
- * @throws Exception if any parameter is not valid
+ * @throws Exception if any of {@link PipeParameters} is not valid
*/
void validate(PipeParameterValidator validator) throws Exception;
/**
- * This method is mainly used to customize PipeSink. In this method, the
user can do the following
- * things:
+ * This method is mainly used to customize {@link PipeSink}. In this method,
the user can do the
+ * following things:
*
* <ul>
- * <li>Use PipeParameters to parse key-value pair attributes entered by
the user.
- * <li>Set the running configurations in PipeSinkRuntimeConfiguration.
+ * <li>Use {@link PipeParameters} to parse key-value pair attributes
entered by the user.
+ * <li>Set the running configurations in {@link
PipeSinkRuntimeConfiguration}.
* </ul>
*
* <p>This method is called after the method {@link
PipeSink#validate(PipeParameterValidator)} is
* called and before the method {@link PipeSink#handshake()} is called.
*
- * @param parameters used to parse the input parameters entered by the user
+ * @param parameters used to parse the input {@link PipeParameters} entered
by the user
* @param configuration used to set the required properties of the running
PipeSink
* @throws Exception the user can throw errors if necessary
*/
@@ -109,18 +112,18 @@ public interface PipeSink extends PipeConnector {
void heartbeat() throws Exception;
/**
- * This method is used to transfer the TabletInsertionEvent.
+ * This method is used to transfer the {@link TabletInsertionEvent}.
*
- * @param tabletInsertionEvent TabletInsertionEvent to be transferred
+ * @param tabletInsertionEvent {@link TabletInsertionEvent} to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
/**
- * This method is used to transfer the TsFileInsertionEvent.
+ * This method is used to transfer the {@link TsFileInsertionEvent}.
*
- * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+ * @param tsFileInsertionEvent {@link TsFileInsertionEvent} to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
@@ -136,9 +139,9 @@ public interface PipeSink extends PipeConnector {
}
/**
- * This method is used to transfer the generic events, including
HeartbeatEvent.
+ * This method is used to transfer the generic {@link Event}s, including
HeartbeatEvent.
*
- * @param event Event to be transferred
+ * @param event {@link Event} to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
index e68724055d6..def0e6e42ff 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeSource.java
@@ -27,23 +27,23 @@ import org.apache.iotdb.pipe.api.event.Event;
/**
* PipeSource
*
- * <p>PipeSource is responsible for capturing events from sources.
+ * <p>{@link PipeSource} is responsible for capturing {@link Event}s from
sources.
*
- * <p>Various data sources can be supported by implementing different
PipeSource classes.
+ * <p>Various data sources can be supported by implementing different {@link
PipeSource} classes.
*
- * <p>The lifecycle of a PipeSource is as follows:
+ * <p>The lifecycle of a {@link PipeSource} is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH
EXTRACTOR` clause in SQL are
* parsed and the validation method {@link
PipeSource#validate(PipeParameterValidator)} will
- * be called to validate the parameters.
+ * be called to validate the {@link PipeParameters}.
* <li>Before the collaboration task starts, the method {@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)}
will be called to
- * config the runtime behavior of the PipeSource.
- * <li>Then the method {@link PipeSource#start()} will be called to start
the PipeSource.
+ * configure the runtime behavior of the {@link PipeSource}.
+ * <li>Then the method {@link PipeSource#start()} will be called to start
the {@link PipeSource}.
* <li>While the collaboration task is in progress, the method {@link
PipeSource#supply()} will be
- * called to capture events from sources and then the events will be
passed to the
- * PipeProcessor.
+ * called to capture {@link Event}s from sources and then the {@link
Event}s will be passed to
+ * the {@link PipeProcessor}.
* <li>The method {@link PipeSource#close()} will be called when the
collaboration task is
* cancelled (the `DROP PIPE` command is executed).
* </ul>
@@ -55,32 +55,32 @@ public interface PipeSource extends PipeExtractor {
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is
called.
*
* @param validator the validator used to validate {@link PipeParameters}
- * @throws Exception if any parameter is not valid
+ * @throws Exception if any {@link PipeParameters} is invalid
*/
void validate(PipeParameterValidator validator) throws Exception;
/**
- * This method is mainly used to customize PipeSource. In this method, the
user can do the
+ * This method is mainly used to customize {@link PipeSource}. In this
method, the user can do the
* following things:
*
* <ul>
- * <li>Use PipeParameters to parse key-value pair attributes entered by
the user.
- * <li>Set the running configurations in PipeSourceRuntimeConfiguration.
+ * <li>Use {@link PipeParameters} to parse key-value pair attributes
entered by the user.
+ * <li>Set the running configurations in {@link
PipeSourceRuntimeConfiguration}.
* </ul>
*
* <p>This method is called after the method {@link
PipeSource#validate(PipeParameterValidator)}
* is called.
*
- * @param parameters used to parse the input parameters entered by the user
- * @param configuration used to set the required properties of the running
PipeSource
+ * @param parameters used to parse the input {@link PipeParameters} entered
by the user
+ * @param configuration used to set the required properties of the running
{@link PipeSource}
* @throws Exception the user can throw errors if necessary
*/
void customize(PipeParameters parameters, PipeSourceRuntimeConfiguration
configuration)
throws Exception;
/**
- * Start the extractor. After this method is called, events should be ready
to be supplied by
- * {@link PipeSource#supply()}. This method is called after {@link
+ * Start the {@link PipeSource}. After this method is called, {@link Event}s
should be ready to be
+ * supplied by {@link PipeSource#supply()}. This method is called after
{@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is
called.
*
* @throws Exception the user can throw errors if necessary
@@ -88,11 +88,13 @@ public interface PipeSource extends PipeExtractor {
void start() throws Exception;
/**
- * Supply single event from the extractor and the caller will send the event
to the processor.
- * This method is called after {@link PipeSource#start()} is called.
+ * Supply single {@link Event} from the {@link PipeSource} and the caller
will send the {@link
+ * Event} to the {@link PipeProcessor}. This method is called after {@link
PipeSource#start()} is
+ * called.
*
- * @return the event to be supplied. the event may be null if the extractor
has no more events at
- * the moment, but the extractor is still running for more events.
+ * @return the {@link Event} to be supplied. the {@link Event} may be {@code
null} if the {@link
+ * PipeSource} has no more {@link Event}s at the moment, but the {@link
PipeSource} is still
+ * running for more {@link Event}s.
* @throws Exception the user can throw errors if necessary
*/
Event supply() throws Exception;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8426c999a6c..992f60e1e67 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -33,6 +33,8 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
public static final String SINK_IOTDB_IP_KEY = "sink.ip";
+ public static final String CONNECTOR_IOTDB_HOST_KEY = "connector.host";
+ public static final String SINK_IOTDB_HOST_KEY = "sink.host";
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String SINK_IOTDB_PORT_KEY = "sink.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
index c183339b3be..e64134eb495 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/iotdb/IoTDBConnector.java
@@ -39,10 +39,12 @@ import java.util.Set;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
@@ -59,7 +61,7 @@ public abstract class IoTDBConnector implements PipeConnector
{
"Exception occurred while parsing node urls from target servers: {}";
private static final String PARSE_URL_ERROR_MESSAGE =
- "Error occurred while parsing node urls from target servers, please
check the specified 'ip':'port' or 'node-urls'";
+ "Error occurred while parsing node urls from target servers, please
check the specified 'host':'port' or 'node-urls'";
@Override
public void validate(PipeParameterValidator validator) throws Exception {
@@ -67,22 +69,24 @@ public abstract class IoTDBConnector implements
PipeConnector {
validator.validate(
args ->
(boolean) args[0]
- || ((boolean) args[1] && (boolean) args[2])
- || (boolean) args[3]
- || ((boolean) args[4] && (boolean) args[5]),
+ || (((boolean) args[1] || (boolean) args[2]) && (boolean)
args[3])
+ || (boolean) args[4]
+ || (((boolean) args[5] || (boolean) args[6]) && (boolean)
args[7]),
String.format(
"One of %s, %s:%s, %s, %s:%s must be specified",
CONNECTOR_IOTDB_NODE_URLS_KEY,
- CONNECTOR_IOTDB_IP_KEY,
+ CONNECTOR_IOTDB_HOST_KEY,
CONNECTOR_IOTDB_PORT_KEY,
SINK_IOTDB_NODE_URLS_KEY,
- SINK_IOTDB_IP_KEY,
+ SINK_IOTDB_HOST_KEY,
SINK_IOTDB_PORT_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+ parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
}
@@ -121,6 +125,22 @@ public abstract class IoTDBConnector implements
PipeConnector {
parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
}
+ if (parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY)
+ && parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY)) {
+ givenNodeUrls.add(
+ new TEndPoint(
+ parameters.getStringByKeys(CONNECTOR_IOTDB_HOST_KEY),
+ parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY)));
+ }
+
+ if (parameters.hasAttribute(SINK_IOTDB_HOST_KEY)
+ && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+ givenNodeUrls.add(
+ new TEndPoint(
+ parameters.getStringByKeys(SINK_IOTDB_HOST_KEY),
+ parameters.getIntByKeys(SINK_IOTDB_PORT_KEY)));
+ }
+
if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
givenNodeUrls.addAll(
NodeUrlUtils.parseTEndPointUrls(
@@ -145,7 +165,7 @@ public abstract class IoTDBConnector implements
PipeConnector {
private void checkNodeUrls(Set<TEndPoint> nodeUrls) throws
PipeParameterNotValidException {
for (TEndPoint nodeUrl : nodeUrls) {
if (Objects.isNull(nodeUrl.ip) || nodeUrl.ip.isEmpty()) {
- LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "ip cannot be empty");
+ LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
}
if (nodeUrl.port == 0) {