This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git
The following commit(s) were added to refs/heads/master by this push:
new 4351ea98129 [INLONG-1117][Doc] Fix Development And SDK Chapter (#1106)
4351ea98129 is described below
commit 4351ea9812945b030b1beffe21f91f9331ab96dd
Author: James Zow <[email protected]>
AuthorDate: Tue Feb 11 16:12:48 2025 +0800
[INLONG-1117][Doc] Fix Development And SDK Chapter (#1106)
* Fix: Fix code block formatting issues and highlight tags
* Fix: Add additionalLanguages attribute resolve code highlighting
* Fix(Doc): zh-CN Code block formatting issues and highlight tags
* Fix(Doc): Modify code block labels and check code
* Add upplementary code labels
ini and nginx support
---
docs/development/binary_protocol/agent.md | 18 +--
docs/development/binary_protocol/audit_msg.md | 15 +-
docs/development/binary_protocol/inlong_msg.md | 17 +-
docs/development/binary_protocol/tubemq_binary.md | 47 +++---
docs/development/extension_agent/agent.md | 20 +--
.../how_to_write_plugin_dashboard.md | 2 +-
.../inlong_manager_shiro_plugin.md | 11 +-
.../extension_sort/custom_flink_metrics.md | 1 +
.../extension_sort/extension_connector.md | 5 +-
.../extension_sort/offline_data_sync.md | 46 +++---
.../extension_transform/transform_udf.md | 175 +++++++++++----------
docs/development/how_to_build.md | 6 +-
docs/sdk/dataproxy-sdk/cpp.md | 22 +--
docs/sdk/dataproxy-sdk/go.md | 2 +-
docs/sdk/dataproxy-sdk/java.md | 10 +-
docs/sdk/dataproxy-sdk/python.md | 2 +-
docs/sdk/manager-sdk/example.md | 7 +-
docs/sdk/tubemq-sdk/cpp.md | 1 -
docusaurus.config.js | 1 +
.../current/development/binary_protocol/agent.md | 18 +--
.../development/binary_protocol/audit_msg.md | 15 +-
.../development/binary_protocol/inlong_msg.md | 17 +-
.../development/binary_protocol/tubemq_binary.md | 67 ++++----
.../current/development/extension_agent/agent.md | 20 +--
.../how_to_write_plugin_dashboard.md | 2 +-
.../inlong_manager_shiro_plugin.md | 13 +-
.../extension_sort/custom_flink_metrics.md | 129 ++++++++-------
.../extension_sort/extension_connector.md | 5 +-
.../extension_sort/offline_data_sync.md | 46 +++---
.../extension_transform/transform_udf.md | 175 +++++++++++----------
.../current/development/how_to_build.md | 6 +-
.../current/sdk/dataproxy-sdk/cpp.md | 16 +-
.../current/sdk/dataproxy-sdk/go.md | 2 +-
.../current/sdk/dataproxy-sdk/java.md | 10 +-
.../current/sdk/dataproxy-sdk/python.md | 2 +-
.../current/sdk/manager-sdk/example.md | 9 +-
.../current/sdk/tubemq-sdk/cpp.md | 1 -
37 files changed, 486 insertions(+), 475 deletions(-)
diff --git a/docs/development/binary_protocol/agent.md
b/docs/development/binary_protocol/agent.md
index ee651cfb920..f711336d4ff 100644
--- a/docs/development/binary_protocol/agent.md
+++ b/docs/development/binary_protocol/agent.md
@@ -136,16 +136,16 @@ The original data content is provided by dataList;
Extended attributes are provi
```java
private class AgentSenderCallback implements SendMessageCallback {
- private final SenderMessage message;
+ private final SenderMessage message;
- AgentSenderCallback(SenderMessage message, int retry) {
- this.message = message;
- }
-
- @Override
- public void onMessageAck(SendResult result) {
- ...
- }
+ AgentSenderCallback(SenderMessage message, int retry) {
+ this.message = message;
+ }
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ ...
+ }
}
```
The onMessageAck method of the callback object will carry the sending result.
After returning success, it will iterate through the
diff --git a/docs/development/binary_protocol/audit_msg.md
b/docs/development/binary_protocol/audit_msg.md
index 18ae21f2979..086890bd38e 100644
--- a/docs/development/binary_protocol/audit_msg.md
+++ b/docs/development/binary_protocol/audit_msg.md
@@ -171,7 +171,7 @@ data into storage systems such as ClickHouse, MySQL, etc.
The specific storage t
### ClickHouse table Schema
-```clickhouse
+```sql
CREATE TABLE apache_inlong_audit.audit_data
(
`log_ts` DateTime COMMENT 'Log timestamp',
@@ -189,12 +189,11 @@ CREATE TABLE apache_inlong_audit.audit_data
`size` Int64 COMMENT 'Message size',
`delay` Int64 COMMENT 'Message delay',
`update_time` DateTime COMMENT 'Update time'
-)
- ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}',
'{replica}')
- PARTITION BY toDate(log_ts)
- ORDER BY (log_ts, audit_id, inlong_group_id, inlong_stream_id,
audit_tag, audit_version, ip)
- TTL toDateTime(log_ts) + toIntervalDay(8)
- SETTINGS index_granularity = 8192
+) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}',
'{replica}')
+ PARTITION BY toDate(log_ts)
+ ORDER BY (log_ts, audit_id, inlong_group_id, inlong_stream_id, audit_tag,
audit_version, ip)
+ TTL toDateTime(log_ts) + toIntervalDay(8)
+ SETTINGS index_granularity = 8192
```
As described above, the table uses the ReplicatedMergeTree storage engine to
achieve distributed storage and high
@@ -203,7 +202,7 @@ audit_id, inlong_group_id, inlong_stream_id, audit_tag,
audit_version, ip ) to o
### MySQL table Schema
-```mysql
+```sql
CREATE TABLE IF NOT EXISTS `audit_data`
(
`id` int(32) NOT NULL PRIMARY KEY AUTO_INCREMENT
COMMENT 'Incremental primary key',
diff --git a/docs/development/binary_protocol/inlong_msg.md
b/docs/development/binary_protocol/inlong_msg.md
index d1f157b8bc5..4985421c862 100644
--- a/docs/development/binary_protocol/inlong_msg.md
+++ b/docs/development/binary_protocol/inlong_msg.md
@@ -20,11 +20,11 @@ InLongMsg is a binary data packet in a custom format, which
consists of a format
The Magic field has 4 valid values in the current implementation of InLongMsg,
which respectively identify 4 different data versions that can be carried in
the Payload part (MAGIC0 is an invalid value):
```java
- private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
- private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
- private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
- private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
- private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
+private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
+private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
+private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
+private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
+private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
```
The Payload part carries data content in the corresponding format according to
the definition of the above Magic field. Regardless of the format used, these
contents are ultimately mapped to the original data information reported by the
user according to {attribute set, single data} or {attribute set, multiple
data}.
Next, we begin to introduce the corresponding Payload definitions according to
different Magic version values.
@@ -150,14 +150,13 @@ The data consumed directly from InLong's message queue
(InLong TubeMQ or Pulsar)
### Add Maven dependency
-<pre><code parentName="pre">
-{`<dependency>
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-common</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
### Add Parse Method
diff --git a/docs/development/binary_protocol/tubemq_binary.md
b/docs/development/binary_protocol/tubemq_binary.md
index c8ae862a6dd..a60e545480f 100644
--- a/docs/development/binary_protocol/tubemq_binary.md
+++ b/docs/development/binary_protocol/tubemq_binary.md
@@ -176,13 +176,13 @@ message RegisterResponseM2P {
- clientId:Identifies the Producer object. The ID value is constructed when
the Producer is started and is valid during the Producer life cycle. The
current construction rules of the Java version of the SDK are:
```java
- ClientId = consumerGroup + "_"
- + AddressUtils.getLocalAddress() + "_" // local ip (IPV4)
- + pid + "_" // processId
- + timestamp + "_" // timestamp
- + counter + "_" // increament counter
- + consumerType + "_" // type of consumer,including Pull and Push
- + clientVersion; // version for client
+ ClientId = consumerGroup + "_"
+ + AddressUtils.getLocalAddress() + "_" // local ip (IPV4)
+ + pid + "_" // processId
+ + timestamp + "_" // timestamp
+ + counter + "_" // increament counter
+ + consumerType + "_" // type of consumer,including Pull and Push
+ + clientVersion; // version for client
```
It is recommended that other languages add the above mark to facilitate
troubleshooting;
@@ -213,7 +213,7 @@ message RegisterResponseM2P {
this.port = Integer.parseInt(strBrokers[2]);
}
this.buildStrInfo();
- }
+ }
```
- authorizedInfo:Master provides authorization information in the following
format:
@@ -352,22 +352,21 @@ message SendMessageResponseB2P {
```
- data: Binary byte stream information of Message, implemented as follows:
- ```Java
- private byte[] encodePayload(final Message message) {
- final byte[] payload = message.getData();
- final String attribute = message.getAttribute();
- if (TStringUtils.isBlank(attribute)) {
- return payload;
- }
- byte[] attrData = StringUtils.getBytesUtf8(attribute);
- final ByteBuffer buffer =
- ByteBuffer.allocate(4 + attrData.length + payload.length);
- buffer.putInt(attrData.length);
- buffer.put(attrData);
- buffer.put(payload);
- return buffer.array();
- }
- ```
+```java
+private byte[] encodePayload(final Message message) {
+ final byte[] payload = message.getData();
+ final String attribute = message.getAttribute();
+ if (TStringUtils.isBlank(attribute)) {
+ return payload;
+ }
+ byte[] attrData = StringUtils.getBytesUtf8(attribute);
+ final ByteBuffer buffer = yteBuffer.allocate(4 + attrData.length +
payload.length);
+ buffer.putInt(attrData.length);
+ buffer.put(attrData);
+ buffer.put(payload);
+ return buffer.array();
+}
+```
- sentAddr: IPv4 of the local machine where the SDK is located. Here, the IP
address is converted into a 32-bit digital ID;
diff --git a/docs/development/extension_agent/agent.md
b/docs/development/extension_agent/agent.md
index 8618f2f821f..01118f5ddd4 100644
--- a/docs/development/extension_agent/agent.md
+++ b/docs/development/extension_agent/agent.md
@@ -151,24 +151,24 @@ case PULSAR:
## Offset control
```java
- protected class SourceData {
+protected class SourceData {
- private byte[] data;
- private Long offset;
- }
+ private byte[] data;
+ private Long offset;
+}
```
```java
- protected List<SourceData> readFromSource() {
- return null;
- }
+protected List<SourceData> readFromSource() {
+ return null;
+}
```
We can see that when the Source reads data, each piece of data will record its
corresponding Offset. This Offset will be automatically recorded by the Agent
after the Sink is successfully written.
When Source is initialized, its corresponding Offset will be automatically
read and stored in the member variable offsetProfile of AbstractSource. You can
use offsetProfile.getOffset() to
get its Offset for initializing the data source.
```java
- protected void initOffset() {
- offsetProfile = OffsetManager.getInstance().getOffset(taskId,
instanceId);
- }
+protected void initOffset() {
+ offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId);
+}
```
## Test
diff --git
a/docs/development/extension_dashboard/how_to_write_plugin_dashboard.md
b/docs/development/extension_dashboard/how_to_write_plugin_dashboard.md
index 292662883ca..5875c333d38 100644
--- a/docs/development/extension_dashboard/how_to_write_plugin_dashboard.md
+++ b/docs/development/extension_dashboard/how_to_write_plugin_dashboard.md
@@ -43,7 +43,7 @@ On the view, we provide two basic view classes, each plugin
can implement one or
Below is a basic example, in the plugin, a class that communicates with
backend is implemented, containing 3 fields (username, password, format). Among
them, `BasicInfo` comes from their different base type classes.
-```ts
+```typescript
import { DataWithBackend } from '@/metas/DataWithBackend';
import { RenderRow } from '@/metas/RenderRow';
import { RenderList } from '@/metas/RenderList';
diff --git a/docs/development/extension_manager/inlong_manager_shiro_plugin.md
b/docs/development/extension_manager/inlong_manager_shiro_plugin.md
index 2619144cb98..1b5bff96294 100644
--- a/docs/development/extension_manager/inlong_manager_shiro_plugin.md
+++ b/docs/development/extension_manager/inlong_manager_shiro_plugin.md
@@ -10,19 +10,18 @@ The Apache Shiro framework is used in the inlong manager to
realize the function
## Dependency
- Add Maven Dependency
-<pre><code parentName="pre">
-{`<dependency>
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-common</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## Code
- Implement the following interfaces
```java
-org.apache.inlong.manager.common.auth.InlongShiro
+package org.apache.inlong.manager.common.auth.InlongShiro
public interface InlongShiro {
@@ -51,6 +50,6 @@ public class InlongShiroImpl implements InlongShiro {
```
- Modify the application.properties under the manager web module
-```java
+```properties
inlong.auth.type=Custom
```
\ No newline at end of file
diff --git a/docs/development/extension_sort/custom_flink_metrics.md
b/docs/development/extension_sort/custom_flink_metrics.md
index 0418be22fb4..432799d5e36 100644
--- a/docs/development/extension_sort/custom_flink_metrics.md
+++ b/docs/development/extension_sort/custom_flink_metrics.md
@@ -137,6 +137,7 @@ Using `sort-end-to-end-tests` located in the
`inlong-sort/sort-end-to-end-tests/
```bash
mvn test -Dtest=Postgres2StarRocksTest
+ ```
Note: You may want to insert test code or construct specific data to trigger
`incDeserializeError()` and ensure your metrics are functioning as expected.
diff --git a/docs/development/extension_sort/extension_connector.md
b/docs/development/extension_sort/extension_connector.md
index 76f58968c2a..e79816a2dd4 100644
--- a/docs/development/extension_sort/extension_connector.md
+++ b/docs/development/extension_sort/extension_connector.md
@@ -68,7 +68,7 @@
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Extra
Specify the connector in the implemented ExtractNode.
-```Java
+```java
// Inherit ExtractNode class and implement specific classes, such as
MongoExtractNode
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("MongoExtract")
@@ -255,7 +255,7 @@ Inlong Sort encapsulates the metric reporting process in
the `org.apache.inlong.
The common practice is to pass parameters such as the InLong Audit address
when constructing the Source/Sink, and initialize the
`SourceExactlyMetric/SinkExactlyMetric` object when calling the open() method
to initialize the Source/Sink operator. After processing the actual data, call
the corresponding audit reporting method.
-```
+```java
public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunctionBase<T> {
private static final long serialVersionUID = 1L;
@@ -310,4 +310,5 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
sinkExactlyPropagateMetric.invoke(1, getDataSize(value),
schemaUtils.getDataTime(data));
}
}
+}
```
diff --git a/docs/development/extension_sort/offline_data_sync.md
b/docs/development/extension_sort/offline_data_sync.md
index eb38c3aabc4..aceb7bb5fa0 100644
--- a/docs/development/extension_sort/offline_data_sync.md
+++ b/docs/development/extension_sort/offline_data_sync.md
@@ -21,12 +21,12 @@ The offline data source reuses the Flink Connector from
real-time synchronizatio
Flink's Source provides interfaces to set data boundaries:
```java
- /**
- * Get the boundedness of this source.
- *
- * @return the boundedness of this source.
- */
- Boundedness getBoundedness();
+/**
+ * Get the boundedness of this source.
+ *
+ * @return the boundedness of this source.
+ */
+Boundedness getBoundedness();
```
Boundedness is an enumeration type with two values: BOUNDED and
CONTINUOUS_UNBOUNDED.
@@ -69,20 +69,20 @@ public abstract class ExtractNode implements Node {
In `PulsarExtractNode`, the Boundaries information will be configured into the
relevant parameters of the Pulsar Connector.:
```java
@Override
- public void fillInBoundaries(Boundaries boundaries) {
- super.fillInBoundaries(boundaries);
- BoundaryType boundaryType = boundaries.getBoundaryType();
- String lowerBoundary = boundaries.getLowerBound();
- String upperBoundary = boundaries.getUpperBound();
- if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
- // set time boundaries
- sourceBoundaryOptions.put("source.start.publish-time",
lowerBoundary);
- sourceBoundaryOptions.put("source.stop.at-publish-time",
upperBoundary);
- log.info("Filled in source boundaries options");
- } else {
- log.warn("Not supported boundary type: {}", boundaryType);
- }
+public void fillInBoundaries(Boundaries boundaries) {
+ super.fillInBoundaries(boundaries);
+ BoundaryType boundaryType = boundaries.getBoundaryType();
+ String lowerBoundary = boundaries.getLowerBound();
+ String upperBoundary = boundaries.getUpperBound();
+ if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
+ // set time boundaries
+ sourceBoundaryOptions.put("source.start.publish-time", lowerBoundary);
+ sourceBoundaryOptions.put("source.stop.at-publish-time",
upperBoundary);
+ og.info("Filled in source boundaries options");
+ } else {
+ log.warn("Not supported boundary type: {}", boundaryType);
}
+}
```
These parameters will be recognized by the PulsarSource, and during the
initialization of the PulsarSource, a `BoundedStopCursor` will be set for the
Source.
```java
@@ -108,10 +108,10 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext context) {
If a `BoundedStopCursor` is configured, the Source's boundedness property will
be set to `Boundedness.BOUNDED`.
```java
public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor) {
- this.boundedness = Boundedness.BOUNDED;
- this.stopCursor = checkNotNull(stopCursor);
- return this;
- }
+ this.boundedness = Boundedness.BOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+}
```
This way, the Flink engine can recognize that this is a bounded Source,
allowing it to process data using a batch approach.
diff --git a/docs/development/extension_transform/transform_udf.md
b/docs/development/extension_transform/transform_udf.md
index 75ec3afc66d..1fcb0bdc23e 100644
--- a/docs/development/extension_transform/transform_udf.md
+++ b/docs/development/extension_transform/transform_udf.md
@@ -124,14 +124,14 @@ Add the corresponding @TransformParser annotation to the
parser class. Type pars
Add a parameterized constructor and related member variables to the parser
class. In the constructor, parse the input expression and convert it into the
corresponding type object. Taking AdditionParser as an example:
```java
- private final ValueParser left;
+private final ValueParser left;
- private final ValueParser right;
+private final ValueParser right;
- public AdditionParser(Addition expr) {
- this.left = OperatorTools.buildParser(expr.getLeftExpression());
- this.right = OperatorTools.buildParser(expr.getRightExpression());
- }
+public AdditionParser(Addition expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+}
```
## Parsing Implement
@@ -139,54 +139,54 @@ Add a parameterized constructor and related member
variables to the parser class
Override the parse method. If the parser needs to perform further processing
on the type object parsed in the previous step, you can implement the
corresponding processing logic in this method. Otherwise, just return the type
object parsed in the previous step directly. Taking AdditionParser as an
example:
```java
- @Override
- public Object parse(SourceData sourceData, int rowIndex, Context context) {
- if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
- return null;
- } else if (this.left instanceof IntervalParser || this.right
instanceof IntervalParser) {
- IntervalParser intervalParser = null;
- ValueParser dateParser = null;
- if (this.left instanceof IntervalParser) {
- intervalParser = (IntervalParser) this.left;
- dateParser = this.right;
- } else {
- intervalParser = (IntervalParser) this.right;
- dateParser = this.left;
- }
- Object intervalPairObj = intervalParser.parse(sourceData,
rowIndex, context);
- Object dateObj = dateParser.parse(sourceData, rowIndex, context);
- if (intervalPairObj == null || dateObj == null) {
- return null;
- }
- return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
- (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj,
1);
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
+ return null;
+ } else if (this.left instanceof IntervalParser || this.right instanceof
IntervalParser) {
+ IntervalParser intervalParser = null;
+ ValueParser dateParser = null;
+ if (this.left instanceof IntervalParser) {
+ intervalParser = (IntervalParser) this.left;
+ dateParser = this.right;
} else {
- return numericalOperation(sourceData, rowIndex, context);
+ intervalParser = (IntervalParser) this.right;
+ dateParser = this.left;
+ }
+ Object intervalPairObj = intervalParser.parse(sourceData, rowIndex,
context);
+ Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+ if (intervalPairObj == null || dateObj == null) {
+ return null;
}
+ return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+ (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1);
+ } else {
+ return numericalOperation(sourceData, rowIndex, context);
}
+}
```
## Add Unit Test Code
Each parser class needs to pass unit tests to verify whether the logic is
correct. The unit test class is located in this directory. All unit test
functions for each parser are placed in the same unit test class, and the unit
test class is named in the format of Test + Parser Name + Parser, taking
TestAdditionParser as an example:
```java
- @Test
- public void testAdditionParser() throws Exception {
- String transformSql = null;
- TransformConfig config = null;
- TransformProcessor<String, String> processor = null;
- List<String> output = null;
-
- transformSql = "select numeric1 + numeric2 from source";
- config = new TransformConfig(transformSql);
- processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- // case1: 1 + 10
- output = processor.transform("1|10||||", new HashMap<>());
- Assert.assertEquals(1, output.size());
- Assert.assertEquals("result=11", output.get(0));
- }
+@Test
+public void testAdditionParser() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select numeric1 + numeric2 from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: 1 + 10
+ output = processor.transform("1|10||||", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=11", output.get(0));
+}
```
After the above steps, congratulations on completing the implementation of a
new parser class, and you can submit your code to the community. The complete
code of AdditionParser can be seen at [code
link](https://github.com/apache/inlong/blob/master/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java)
@@ -206,10 +206,7 @@ After creating the class, build the basic framework of the
code, taking AndOpera
public class AndOperator implements ExpressionOperator {
@Override
- public boolean check(SourceData sourceData, int rowIndex, Context context)
{
-
- }
-
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{}
}
```
Add the corresponding @TransformOperator annotation to the logical operator
class. The operator class needs to implement the ExpressionOperator interface
and override the check method in the interface.
@@ -243,43 +240,49 @@ public boolean check(SourceData sourceData, int rowIndex,
Context context) {
## Add Unit Test Code
Each logical operator class needs to pass unit tests to verify whether the
logic is correct. The unit test class is located in this
[directory](https://github.com/apache/inlong/tree/master/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator).
All unit test functions for each logical operator are placed in the same unit
test class, and the unit test class is named in the format of Test + Logical
Operator Name + Operator, taking TestAndOperator as an example:
```java
- public void testAndOperator() throws Exception {
- String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0)
from source";
- TransformConfig config = new TransformConfig(transformSql);
- // case1: "3.14159265358979323846|3a|4|4"
- TransformProcessor<String, String> processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|4");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output1.get(0), "result=0");
- // case2: "3.14159265358979323846|5|4|8"
- List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output2.get(0), "result=0");
- // case3: "3.14159265358979323846|3|4|8"
- List<String> output3 =
processor.transform("3.14159265358979323846|3|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output3.get(0), "result=1");
-
- transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from
source";
- config = new TransformConfig(transformSql);
- // case4: "3.14159265358979323846|4|4|8"
- processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- List<String> output4 =
processor.transform("3.14159265358979323846|4|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output4.get(0), "result=0");
- // case5: "3.14159265358979323846|4|3.2|4"
- List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|4");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output5.get(0), "result=0");
- // case6: "3.14159265358979323846|4|3.2|8"
- List<String> output6 =
processor.transform("3.14159265358979323846|4|3.2|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output6.get(0), "result=1");
- }
+public void testAndOperator() throws Exception {
+ String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0)
from source";
+ TransformConfig config = new TransformConfig(transformSql);
+
+ // case1: "3.14159265358979323846|3a|4|4"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 = processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+
+ // case3: "3.14159265358979323846|3|4|8"
+ List<String> output3 = processor.transform("3.14159265358979323846|3|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+
+ transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from
source";
+ config = new TransformConfig(transformSql);
+
+ // case4: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output4 = processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+
+ // case5: "3.14159265358979323846|4|3.2|4"
+ List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output5.get(0), "result=0");
+
+ // case6: "3.14159265358979323846|4|3.2|8"
+ List<String> output6 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output6.get(0), "result=1");
+}
```
After the above steps, congratulations on completing the implementation of a
new logical operator class, and you can submit your code to the community. The
complete code of AndOperator can be seen at [code
link](https://github.com/apache/inlong/blob/master/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java)
diff --git a/docs/development/how_to_build.md b/docs/development/how_to_build.md
index d28aae660a1..6c790fd7167 100644
--- a/docs/development/how_to_build.md
+++ b/docs/development/how_to_build.md
@@ -9,11 +9,11 @@ Download Source Code from [InLong Download
Page](https://inlong.apache.org/downl
- Java [JDK 8](https://adoptopenjdk.net/?variant=openjdk8)
- Maven 3.6.1+
-```
+```shell
$ mvn clean install -DskipTests
```
(Optional) Compile using docker image:
-```
+```shell
$ docker pull maven:3.6-openjdk-8
$ docker run -v `pwd`:/inlong -w /inlong maven:3.6-openjdk-8 mvn clean
install -DskipTests
```
@@ -33,6 +33,6 @@ inlong-dashboard
## Build Docker Images
- [Docker](https://docs.docker.com/engine/install/) 19.03.1+
-```
+```shell
mvn clean package -DskipTests -Pdocker
```
\ No newline at end of file
diff --git a/docs/sdk/dataproxy-sdk/cpp.md b/docs/sdk/dataproxy-sdk/cpp.md
index e69761df2a9..ff30155b880 100644
--- a/docs/sdk/dataproxy-sdk/cpp.md
+++ b/docs/sdk/dataproxy-sdk/cpp.md
@@ -23,14 +23,14 @@ SDK
[send_demo.cc](https://github.com/apache/inlong/blob/master/inlong-sdk/datap
SDK supports a process to create one SDK instance, which is multi-thread safe.
It also supports a process to create
multiple SDK instances. Each SDK instance is independent of each other and
each SDK instance is also thread-safe
- Create SDK instance object
-```
+```cpp
InLongApi inlong_api
```
- object instance initialization
Configuration files are in json format, see [Config file
description](#Appendix:Config File Description), initialize
the SDK through the configuration file:
-```
+```cpp
// Initialize the SDK, the parameter is the path name of the configuration
file; a return value of zero indicates successful initialization
int32_t result = inlong_api.InitApi("/home/conf/config.json");
```
@@ -39,22 +39,24 @@ int32_t result =
inlong_api.InitApi("/home/conf/config.json");
The SDK supports single (recommended) and batch sending, both of which are
sent in asynchronous mode, and the data
reporting interface is thread-safe. Before data reporting, the callback
function can be set to perform callback
processing when the data transmission fails. The callback function signature
is as follows:
-```
-int32_t callBackFunc(const char* inlong_group_id, const char*
inlong_stream_id, const char* msg, int32_t msg_len, const int64_t report_time,
const char* client_ip);
-```
-
-- Single data reporting interface
-```
-// Return value: zero means sending is successful, non-zero means failure, see
SDKInvalidReuslt in tc_api.h for specific exception return value
+```cpp
int32_t CallBackFunc(const char* inlong_group_id, const char* inlong_stream_id,
const char* msg, int32_t msg_len,
const int64_t report_time,
const char* client_ip);
```
+- Single data reporting interface
+```cpp
+// Return value: zero means sending is successful, non-zero means failure, see
SDKInvalidReuslt in tc_api.h for specific exception return value
+int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
+ const char *msg, int32_t msg_len,
+ UserCallBack call_back = nullptr)
+```
+
### Close SDK
Call the close interface to close the SDK:
-```
+```cpp
// A return value of zero means that the shutdown is successful, and
subsequent data reporting cannot be performed
// max_waitms:The maximum number of milliseconds to wait before closing the
SDK, waiting for the completion of the SDK internal data sending
int32_t CloseApi(int32_t max_waitms);
diff --git a/docs/sdk/dataproxy-sdk/go.md b/docs/sdk/dataproxy-sdk/go.md
index bfeebc45c67..8908e62ee61 100755
--- a/docs/sdk/dataproxy-sdk/go.md
+++ b/docs/sdk/dataproxy-sdk/go.md
@@ -22,7 +22,7 @@ Basically, there are 3 steps to produce messages:
```go
client, err := dataproxy.NewClient(
dataproxy.WithGroupID("test"),
-dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
+
dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
dataproxy.WithMetricsName("test"),
)
diff --git a/docs/sdk/dataproxy-sdk/java.md b/docs/sdk/dataproxy-sdk/java.md
index d8d09633aea..49ce44f51e8 100644
--- a/docs/sdk/dataproxy-sdk/java.md
+++ b/docs/sdk/dataproxy-sdk/java.md
@@ -12,14 +12,14 @@ Create a task on the Dashboard or through the command line,
and use `Auto Push`
The library of the SDK need to be imported into the project before using the
SDK. The library can be obtained in the following two ways:
- Get the source code and compile it yourself and deploy the SDK package to
the local warehouse, see [How to
Build](https://inlong.apache.org/docs/next/development/how_to_build/).
- Imported through maven dependency like this:
-<pre><code parentName="pre">
-{`<dependency>
+
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dataproxy-sdk</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## Data report process
After import the SDK, you can instantiate a
[TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java)
object, call sync(`sendMessage()`) or async(`asyncSendMessage()`) interface to
report single or multiple(batch) data. see
[TcpClientExample.java](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.
[...]
@@ -28,7 +28,7 @@ The overall process includes the following three steps:
### Initialize SDK
From the demo code, we can see that the client initialization is mainly done
in the `getMessageSender()` function:
```java
- public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory,
boolean visitMgrByHttps,
+public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean
visitMgrByHttps,
String managerAddr, String
managerPort, String inlongGroupId, int msgType,
boolean useLocalMetaConfig, String
configBasePath) {
TcpMsgSender messageSender = null;
diff --git a/docs/sdk/dataproxy-sdk/python.md b/docs/sdk/dataproxy-sdk/python.md
index c190f2e5da0..1e6d6c84a9a 100644
--- a/docs/sdk/dataproxy-sdk/python.md
+++ b/docs/sdk/dataproxy-sdk/python.md
@@ -22,7 +22,7 @@ Your system's Python site-packages directory is: xxx/xxx
Enter the target directory for the .so files (Press Enter to use the default
site-packages directory):
```
After the build process is completed, you can import the package in your
Python project and use the InLong DataProxy SDK to report data.
-```shell
+```python
import inlong_dataproxy
```
> **Note**: When the C++ SDK or the Python version you are using is updated,
> you need to rebuild it following the steps above.
diff --git a/docs/sdk/manager-sdk/example.md b/docs/sdk/manager-sdk/example.md
index ab37bc67c91..0cebd4d703b 100644
--- a/docs/sdk/manager-sdk/example.md
+++ b/docs/sdk/manager-sdk/example.md
@@ -13,14 +13,13 @@ which means you can use client to manipulate your group
task instead of UI.
## Dependency
- Add Maven Dependency
-<pre><code parentName="pre">
-{`<dependency>
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-client</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## Code
diff --git a/docs/sdk/tubemq-sdk/cpp.md b/docs/sdk/tubemq-sdk/cpp.md
index dbba229f0bb..6648ca27b82 100644
--- a/docs/sdk/tubemq-sdk/cpp.md
+++ b/docs/sdk/tubemq-sdk/cpp.md
@@ -15,7 +15,6 @@ mkdir build && cd build
cmake ..
make -j8 # the thread num is based on the cpu cores
-
```
The building can also be completed in
[docker](https://github.com/apache/inlong/tree/master/inlong-tubemq/tubemq-docker/tubemq-cpp)
environment provided by InLong.
diff --git a/docusaurus.config.js b/docusaurus.config.js
index 6668825015d..71f91bde231 100644
--- a/docusaurus.config.js
+++ b/docusaurus.config.js
@@ -218,6 +218,7 @@ const darkCodeTheme =
require('prism-react-renderer/themes/dracula');
prism: {
theme: require('prism-react-renderer/themes/dracula'),
darkTheme: darkCodeTheme,
+ additionalLanguages:
['java','bash','docker','protobuf','sql','properties','yaml','typescript',
'ini', 'nginx'],
},
algolia: {
appId: 'YUW9QEL53E',
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/agent.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/agent.md
index 853f5f5be97..b93d0f38b9c 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/agent.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/agent.md
@@ -131,16 +131,16 @@ SenderManager 内部直接调用 DataProxy SDK 进行数据发送,需要 3 个
```java
private class AgentSenderCallback implements SendMessageCallback {
- private final SenderMessage message;
+ private final SenderMessage message;
- AgentSenderCallback(SenderMessage message, int retry) {
- this.message = message;
- }
-
- @Override
- public void onMessageAck(SendResult result) {
- ...
- }
+ AgentSenderCallback(SenderMessage message, int retry) {
+ this.message = message;
+ }
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ ...
+ }
}
```
回调对象 onMessageAck 方法会携带发送结果,返回成功后遍历 SenderMessage::offsetAckList,将
OffsetAckInfo::hasAck 设
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/audit_msg.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/audit_msg.md
index 4e80ef507e3..0cc2722268c 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/audit_msg.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/audit_msg.md
@@ -161,7 +161,7 @@ Audit Store 从消息队列( MQ )中消费 `AuditData` 审计数据,对其
### ClickHouse 表 Schema
-```clickhouse
+```sql
CREATE TABLE apache_inlong_audit.audit_data
(
`log_ts` DateTime COMMENT 'Log timestamp',
@@ -179,12 +179,11 @@ CREATE TABLE apache_inlong_audit.audit_data
`size` Int64 COMMENT 'Message size',
`delay` Int64 COMMENT 'Message delay',
`update_time` DateTime COMMENT 'Update time'
-)
- ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}',
'{replica}')
- PARTITION BY toDate(log_ts)
- ORDER BY (log_ts, audit_id, inlong_group_id, inlong_stream_id,
audit_tag, audit_version, ip)
- TTL toDateTime(log_ts) + toIntervalDay(8)
- SETTINGS index_granularity = 8192
+) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}',
'{replica}')
+ PARTITION BY toDate(log_ts)
+ ORDER BY (log_ts, audit_id, inlong_group_id, inlong_stream_id, audit_tag,
audit_version, ip)
+ TTL toDateTime(log_ts) + toIntervalDay(8)
+ SETTINGS index_granularity = 8192
```
如上所述,该表采用 ReplicatedMergeTree 存储引擎,以实现分布式存储和高可用性。数据将根据 log_ts 列进行分区,并按照(
log_ts,
@@ -192,7 +191,7 @@ audit_id, inlong_group_id, inlong_stream_id, audit_tag,
audit_version, ip )的
### MySQL 表 Schema
-```mysql
+```sql
CREATE TABLE IF NOT EXISTS `audit_data`
(
`id` int(32) NOT NULL PRIMARY KEY AUTO_INCREMENT
COMMENT 'Incremental primary key',
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/inlong_msg.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/inlong_msg.md
index c872df69007..0e5973899ea 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/inlong_msg.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/inlong_msg.md
@@ -20,11 +20,11 @@ InLongMsg 是自定义格式的二进制数据包,由前后各 2 个字节的
Magic 字段在 InLongMsg 的当前实现里一共有 4 个有效值,分别标识 Payload 部分可携带的 4 种不同的数据版本(MAGIC0
为无效值):
```java
- private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
- private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
- private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
- private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
- private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
+private static final byte[] MAGIC0 = {(byte) 0xf, (byte) 0x0};
+private static final byte[] MAGIC1 = {(byte) 0xf, (byte) 0x1};
+private static final byte[] MAGIC2 = {(byte) 0xf, (byte) 0x2};
+private static final byte[] MAGIC3 = {(byte) 0xf, (byte) 0x3};
+private static final byte[] MAGIC4 = {(byte) 0xf, (byte) 0x4};
```
Payload 部分根据上述 Magic 字段的定义携带对应格式的数据内容,这些内容不论采用什么样的格式最终都映射为用户按照 {属性集合,单条数据},或者
{属性集合,多条数据} 上报的原始数据信息。
@@ -154,14 +154,13 @@ AttrDataCnt 接下来的信息则逐条存储 {属性,数据} 对信息
### 增加 maven 依赖
-<pre><code parentName="pre">
-{`<dependency>
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-common</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
### 增加解析逻辑
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md
index 09639fccd7d..4b6d1e316e3 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/binary_protocol/tubemq_binary.md
@@ -175,13 +175,13 @@ message RegisterResponseM2P {
- clientId:标识 Producer 对象,该 ID 值在 Producer 启动时构造并在 Producer 生命周期内有效,目前 Java
版本的 SDK 构造规则是:
```java
- ClientId = consumerGroup + "_"
- + AddressUtils.getLocalAddress() + "_" // 本机IP (IPV4)
- + pid + "_" // 进程ID
- + timestamp + "_" // 时间戳
- + counter + "_" // 自增计数器
- + consumerType + "_" // 消费者类型,包含 Pull 和 Push 两种类型
- + clientVersion; // 客户端版本号
+ ClientId = consumerGroup + "_"
+ + AddressUtils.getLocalAddress() + "_" // 本机IP (IPV4)
+ + pid + "_" // 进程ID
+ + timestamp + "_" // 时间戳
+ + counter + "_" // 自增计数器
+ + consumerType + "_" // 消费者类型,包含 Pull 和 Push 两种类型
+ + clientVersion; // 客户端版本号
```
建议其他语言增加如上标记,以便于问题排查;
@@ -202,17 +202,17 @@ message RegisterResponseM2P {
- brokerInfos:Broker 元数据信息,该字段里主要是 Master 反馈给 Producer 的整个集群的 Broker
信息列表;其格式如下:
```java
- public BrokerInfo(String strBrokerInfo, int brokerPort) {
- String[] strBrokers =
- strBrokerInfo.split(TokenConstants.ATTR_SEP);
- this.brokerId = Integer.parseInt(strBrokers[0]);
- this.host = strBrokers[1];
- this.port = brokerPort;
- if (!TStringUtils.isBlank(strBrokers[2])) {
- this.port = Integer.parseInt(strBrokers[2]);
- }
- this.buildStrInfo();
- }
+ public BrokerInfo(String strBrokerInfo, int brokerPort) {
+ String[] strBrokers =
+ strBrokerInfo.split(TokenConstants.ATTR_SEP);
+ this.brokerId = Integer.parseInt(strBrokers[0]);
+ this.host = strBrokers[1];
+ this.port = brokerPort;
+ if (!TStringUtils.isBlank(strBrokers[2])) {
+ this.port = Integer.parseInt(strBrokers[2]);
+ }
+ this.buildStrInfo();
+ }
```
- authorizedInfo:Master 提供的授权信息,格式如下:
@@ -354,22 +354,21 @@ message SendMessageResponseB2P {
- data:Message 的二进制字节流信息,实现如下:
- ```java
- private byte[] encodePayload(final Message message) {
- final byte[] payload = message.getData();
- final String attribute = message.getAttribute();
- if (TStringUtils.isBlank(attribute)) {
- return payload;
- }
- byte[] attrData = StringUtils.getBytesUtf8(attribute);
- final ByteBuffer buffer =
- ByteBuffer.allocate(4 + attrData.length + payload.length);
- buffer.putInt(attrData.length);
- buffer.put(attrData);
- buffer.put(payload);
- return buffer.array();
- }
- ```
+```java
+private byte[] encodePayload(final Message message) {
+ final byte[] payload = message.getData();
+ final String attribute = message.getAttribute();
+ if (TStringUtils.isBlank(attribute)) {
+ return payload;
+ }
+ byte[] attrData = StringUtils.getBytesUtf8(attribute);
+ final ByteBuffer buffer = yteBuffer.allocate(4 + attrData.length +
payload.length);
+ buffer.putInt(attrData.length);
+ buffer.put(attrData);
+ buffer.put(payload);
+ return buffer.array();
+}
+```
- sentAddr:SDK 所在本机的 IPv4,这里将IP地址转为 32 位的数字 ID;
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_agent/agent.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_agent/agent.md
index f6572862041..cf1d8e6c0e3 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_agent/agent.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_agent/agent.md
@@ -153,24 +153,24 @@ public class PulsarTask {
## 位点控制
```java
- protected class SourceData {
+protected class SourceData {
- private byte[] data;
- private Long offset;
- }
+ private byte[] data;
+ private Long offset;
+}
```
```java
- protected List<SourceData> readFromSource() {
- return null;
- }
+protected List<SourceData> readFromSource() {
+ return null;
+}
```
我们可以看到,Source 读取数据时每一条数据都会记录其对应的 Offset,这个 Offset 最终在 Sink 端写入成功后才会由 Agent
自动记录。
而在 Source 初始化时会自动读取其对应的 Offset,保存在 AbstractSource 的成员变量 offsetProfile,通过
offsetProfile.getOffset() 可以
获得其 Offset 用于初始化数据源。
```java
- protected void initOffset() {
- offsetProfile = OffsetManager.getInstance().getOffset(taskId,
instanceId);
- }
+protected void initOffset() {
+ offsetProfile = OffsetManager.getInstance().getOffset(taskId, instanceId);
+}
```
## 测试
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_dashboard/how_to_write_plugin_dashboard.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_dashboard/how_to_write_plugin_dashboard.md
index 5b32cad523d..238d5a0476a 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_dashboard/how_to_write_plugin_dashboard.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_dashboard/how_to_write_plugin_dashboard.md
@@ -43,7 +43,7 @@ InLong Dashboard 本身作为前端控制台,采用 React 框架构建。
下面是一个基本示例,在该插件中,实现了与 backend 通信的一个类,包含3个字段(username, password,
format)。其中,`BasicInfo` 来源于各自不同的基础类型类。
-```ts
+```typescript
import { DataWithBackend } from '@/metas/DataWithBackend';
import { RenderRow } from '@/metas/RenderRow';
import { RenderList } from '@/metas/RenderList';
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_manager/inlong_manager_shiro_plugin.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_manager/inlong_manager_shiro_plugin.md
index 3f71ad87fd4..4f8e729022c 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_manager/inlong_manager_shiro_plugin.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_manager/inlong_manager_shiro_plugin.md
@@ -9,20 +9,19 @@ import {siteVariables} from '../../version';
Inlong Manager中使用了Apache Shiro框架实现了认证和授权等功能,Manager已经集成好了默认的实现逻辑,如果您想在Inlong
Manager中实现自定义的基于Shiro的认证和授权功能,您可以按照如下的说明,进行相关功能的插件化开发。
## 依赖
-- 增加maven 依赖
-<pre><code parentName="pre">
-{`<dependency>
+- 添加maven 依赖
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-common</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## 编码
- 实现其中关于Shiro相关模块的接口
```java
-org.apache.inlong.manager.common.auth.InlongShiro
+package org.apache.inlong.manager.common.auth.InlongShiro
public interface InlongShiro {
@@ -51,6 +50,6 @@ public class InlongShiroImpl implements InlongShiro {
```
- 修改manager-web module下application.properties文件中配置
-```java
+```properties
inlong.auth.type=Custom
```
\ No newline at end of file
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/custom_flink_metrics.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/custom_flink_metrics.md
index a4655bb0ea2..90d9c66ebe7 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/custom_flink_metrics.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/custom_flink_metrics.md
@@ -16,15 +16,19 @@ InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义
首先,需要在 `SourceExactlyMetric` 或 `SinkExactlyMetric` 类中添加新的 Flink Metric
对象。Metric 对象通常可以是 `Counter`、`Gauge`、`Meter` 等类型。在本例中,选择创建一个用于记录反序列化错误次数的
`Counter`,并将其作为类的成员变量:
- private Counter numDeserializeError;
+```java
+private Counter numDeserializeError;
+```
### 2. 编写 `registerMetricsForXXX` 方法
为了初始化并注册此 Metric 对象,需要编写一个 `registerMetricsForNumDeserializeError` 方法。在此方法中,通过
`registerCounter` 将该 `Counter` 对象注册到 Flink 的 Metric 系统,以便系统能够跟踪此 Metric。
- public void registerMetricsForNumDeserializeError(Counter counter) {
- numDeserializeError = registerCounter("numDeserializeError", counter);
- }
+```java
+public void registerMetricsForNumDeserializeError(Counter counter) {
+ numDeserializeError = registerCounter("numDeserializeError", counter);
+}
+```
在该方法中,通过调用 `registerCounter` 方法,将自定义的 `Counter` 对象与 Flink 的 Metric 系统相关联,并确保此
Metric 能够在后续的数据处理过程中被正确记录。
@@ -32,11 +36,13 @@ InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义
在类的构造函数中,根据传入的 `MetricOption` 和 `MetricGroup`
参数,调用上述编写的注册方法。这样可以确保在实例化时,Metric对象被正确初始化和注册:
- public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
- this.labels = option.getLabels();
- registerMetricsForNumDeserializeError(new ThreadSafeCounter());
- }
+```java
+public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ this.labels = option.getLabels();
+ registerMetricsForNumDeserializeError(new ThreadSafeCounter());
+}
+```
通过在构造函数中调用 `registerMetricsForNumDeserializeError` 方法,确保 `numDeserializeError`
计数器在每次实例化时都已初始化,并准备好在数据处理过程中记录反序列化错误。
@@ -44,11 +50,13 @@ InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义
为了在外部能够操作 `numDeserializeError` 计数器,还需编写相应的getter和操作方法。在本例中,为
`numDeserializeError` 计数器提供一个增加计数的方法 `incNumDeserializeError`
,以便在反序列化过程发生异常时调用此方法递增计数:
- public void incNumDeserializeError() {
- if (numDeserializeError != null) {
- numDeserializeError.inc();
- }
+```java
+public void incNumDeserializeError() {
+ if (numDeserializeError != null) {
+ numDeserializeError.inc();
}
+}
+```
通过编写该操作方法,可以确保在反序列化出现错误时能够调用 `incNumDeserializeError` 递增错误计数。
@@ -56,31 +64,35 @@ InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义
为了便于调试和监控以及保证代码完整性,需要在 `toString` 方法中增加该自定义 Metric 的输出信息:
- @Override
- public String toString() {
- return "SourceMetricData{"
- + ", numDeserializeError=" + numDeserializeError.getCount()
- + "}";
- }
+```java
+@Override
+public String toString() {
+ return "SourceMetricData{"
+ + ", numDeserializeError=" + numDeserializeError.getCount()
+ + "}";
+}
+```
### 6. 在合适位置插入自定义 Metric
在 Metric 类中完成注册和初始化后,需要在合适的逻辑节点中调用该 Metric。在本例中,在反序列化方法中调用
`incNumDeserializeError` 方法,以记录每次反序列化错误的发生。此操作可以通过以下代码实现:
- @Override
- public void deserialize(SourceRecord record, Collector<RowData> out)
throws Exception {
- try {
- // 执行反序列化逻辑
- } catch (Exception e) {
- // 反序列化失败时递增错误计数
- // 必须检查sourceExactlyMetric是否为空
- if(sourceExactlyMetric != null) {
+```java
+@Override
+public void deserialize(SourceRecord record, Collector<RowData> out) throws
Exception {
+ try {
+ // Execute deserialization logic
+ } catch (Exception e) {
+ // Increment error count on deserialization failure
+ // Ensure sourceExactlyMetric is not null
+ if(sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
- }
- throw e;
}
+ throw e;
}
+}
+```
在反序列化过程中,通过调用 `incNumDeserializeError` 方法,确保每次反序列化失败时都能增加错误计数,从而准确反映反序列化错误的频率。
@@ -88,40 +100,39 @@ InLong Sort 框架允许用户在不同的 Connector 中定义和插入自定义
使用 `sort-end-to-end-tests` 测试,位于 `inlong-sort/sort-end-to-end-tests/` 目录下。
1. **设置 SQL 中的 Metric 标签**:在测试SQL文件中增加 `inlong.metric.labels` 标签,确保 Flink 能识别
Metric 标签:
以
`sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2StarRocksTest.java`
中的
`sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql`
为例:
- ```sql
- CREATE TABLE test_input1 (
- `id` INT primary key,
- name STRING,
- description STRING
-) WITH (
- 'connector' = 'postgres-cdc-inlong',
- 'hostname' = 'postgres',
- 'port' = '5432',
- 'username' = 'flinkuser',
- 'password' = 'flinkpw',
- 'database-name' = 'test',
- 'table-name' = 'test_input1',
- 'schema-name' = 'public',
- 'decoding.plugin.name' = 'pgoutput',
- 'slot.name' = 'inlong_slot',
- 'debezium.slot.name' = 'inlong_slot',
- -- 增加的部分
- 'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
-
- -- Sink 部分保持不变
-);
- ```
- 剩余 Flink SQL 保持不变即可
-
+ ```sql
+ CREATE TABLE test_input1 (
+ `id` INT primary key,
+ name STRING,
+ description STRING
+ ) WITH (
+ 'connector' = 'postgres-cdc-inlong',
+ 'hostname' = 'postgres',
+ 'port' = '5432',
+ 'username' = 'flinkuser',
+ 'password' = 'flinkpw',
+ 'database-name' = 'test',
+ 'table-name' = 'test_input1',
+ 'schema-name' = 'public',
+ 'decoding.plugin.name' = 'pgoutput',
+ 'slot.name' = 'inlong_slot',
+ 'debezium.slot.name' = 'inlong_slot',
+ -- Added portion
+ 'inlong.metric.labels' =
'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
+ );
+
+ -- 剩余 Flink SQL 保持不变即
+ ```
+
2. **配置日志输出查看 Metric **:在测试环境配置中启用 Metric 日志输出,以便可以在控制台中看到统计结果:
- ```properties
- metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
- metrics.reporter.slf4j.interval: 5 SECONDS
- ```
+ ```properties
+ metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
+ metrics.reporter.slf4j.interval: 5 SECONDS
+ ```
3. **运行 end-to-end
测试并验证输出**:使用以下命令在`inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15`运行指定的
end-to-end 测试,并在控制台中查看 `numDeserializeError` 的值是否为预期值。:
```bash
- mvn test -Dtest=Postgres2StarRocksTest
+ mvn test -Dtest=Postgres2StarRocksTest
```
提示:可以考虑添加一些逻辑或构造一些数据,触发`incDeserializeError()`,以确认 Metric 运作正常
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md
index 59012fb2d8d..78344974d03 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/extension_connector.md
@@ -72,7 +72,7 @@
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Extra
```
在实现的 ExtractNode 中指定 connector;
-```Java
+```java
// 继承 ExtractNode 类,实现具体的类,例如 MongoExtractNode
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("MongoExtract")
@@ -267,7 +267,7 @@ Inlong Sort 将指标上报的流程封装在了 `org.apache.inlong.sort.base.me
通常的做法是在构造 Source/Sink 时传递例如 InLong Audit 地址,在初始化 Source/Sink 算子调用 open()
方法时进行初始化 `SourceExactlyMetric/SinkExactlyMetric` 对象。
在处理实际数据后再调用对应的审计上报方法。
-```
+```java
public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunctionBase<T> {
private static final long serialVersionUID = 1L;
@@ -322,5 +322,6 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
sinkExactlyMetric.invoke(1, getDataSize(value),
schemaUtils.getDataTime(data));
}
}
+}
```
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/offline_data_sync.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/offline_data_sync.md
index 0663f77379b..e865f539059 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/offline_data_sync.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_sort/offline_data_sync.md
@@ -22,12 +22,12 @@ Apache InLong 通过流批统一的 Flink SQL API 实现一套代码同时支持
Flink 的 Source 中提供了接口来设置数据边界的接口:
```java
- /**
- * Get the boundedness of this source.
- *
- * @return the boundedness of this source.
- */
- Boundedness getBoundedness();
+/**
+ * Get the boundedness of this source.
+ *
+ * @return the boundedness of this source.
+ */
+Boundedness getBoundedness();
```
Boundedness 是一个枚举类型,有两个值:
```java
@@ -70,20 +70,20 @@ public abstract class ExtractNode implements Node {
`PulsarExtractNode` 中会将 Boundaries 信息配置到 Pulsar Connector 的相关参数中:
```java
@Override
- public void fillInBoundaries(Boundaries boundaries) {
- super.fillInBoundaries(boundaries);
- BoundaryType boundaryType = boundaries.getBoundaryType();
- String lowerBoundary = boundaries.getLowerBound();
- String upperBoundary = boundaries.getUpperBound();
- if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
- // 设置时间边界
- sourceBoundaryOptions.put("source.start.publish-time",
lowerBoundary);
- sourceBoundaryOptions.put("source.stop.at-publish-time",
upperBoundary);
- log.info("Filled in source boundaries options");
- } else {
- log.warn("Not supported boundary type: {}", boundaryType);
- }
+public void fillInBoundaries(Boundaries boundaries) {
+ super.fillInBoundaries(boundaries);
+ BoundaryType boundaryType = boundaries.getBoundaryType();
+ String lowerBoundary = boundaries.getLowerBound();
+ String upperBoundary = boundaries.getUpperBound();
+ if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
+ // set time boundaries
+ sourceBoundaryOptions.put("source.start.publish-time", lowerBoundary);
+ sourceBoundaryOptions.put("source.stop.at-publish-time",
upperBoundary);
+ og.info("Filled in source boundaries options");
+ } else {
+ log.warn("Not supported boundary type: {}", boundaryType);
}
+}
```
这些参数会被 PulsarSource 感知到,在初始化 PulsarSource 时,会为 Source 设置一个 `BoundedStopCursor`
```java
@@ -109,10 +109,10 @@ public ScanRuntimeProvider
getScanRuntimeProvider(ScanContext context) {
如果配置了 `BoundedStopCursor`,则会将 Source 的 `boundedness` 属性设置为
`Boundedness.BOUNDED`。
```java
public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor) {
- this.boundedness = Boundedness.BOUNDED;
- this.stopCursor = checkNotNull(stopCursor);
- return this;
- }
+ this.boundedness = Boundedness.BOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+}
```
这样 Flink 引擎就可以感知这是一个有边界的 Source,从而使用 Batch 的方式来处理数据。
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_transform/transform_udf.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_transform/transform_udf.md
index 702b2f470ea..186e6c38edc 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_transform/transform_udf.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/extension_transform/transform_udf.md
@@ -122,14 +122,14 @@ public class AdditionParser implements ValueParser {
在解析器类中添加有参构造函数及相关的成员变量,在构造函数中解析输入的表达式,将其转换为对应的类型对象,以 `AdditionParser` 为例:
```java
- private final ValueParser left;
+private final ValueParser left;
- private final ValueParser right;
+private final ValueParser right;
- public AdditionParser(Addition expr) {
- this.left = OperatorTools.buildParser(expr.getLeftExpression());
- this.right = OperatorTools.buildParser(expr.getRightExpression());
- }
+public AdditionParser(Addition expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+}
```
## 实现解析逻辑
@@ -137,54 +137,54 @@ public class AdditionParser implements ValueParser {
重写 `parse`
方法,如果解析器需要对上一步中解析得到的类型对象进行进一步处理,可以在此方法中实现对应的处理逻辑,否则直接返回上一步中解析得到的类型对象即可,以
`AdditionParser` 为例:
```java
- @Override
- public Object parse(SourceData sourceData, int rowIndex, Context context) {
- if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
- return null;
- } else if (this.left instanceof IntervalParser || this.right
instanceof IntervalParser) {
- IntervalParser intervalParser = null;
- ValueParser dateParser = null;
- if (this.left instanceof IntervalParser) {
- intervalParser = (IntervalParser) this.left;
- dateParser = this.right;
- } else {
- intervalParser = (IntervalParser) this.right;
- dateParser = this.left;
- }
- Object intervalPairObj = intervalParser.parse(sourceData,
rowIndex, context);
- Object dateObj = dateParser.parse(sourceData, rowIndex, context);
- if (intervalPairObj == null || dateObj == null) {
- return null;
- }
- return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
- (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj,
1);
+@Override
+public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ if (this.left instanceof IntervalParser && this.right instanceof
IntervalParser) {
+ return null;
+ } else if (this.left instanceof IntervalParser || this.right instanceof
IntervalParser) {
+ IntervalParser intervalParser = null;
+ ValueParser dateParser = null;
+ if (this.left instanceof IntervalParser) {
+ intervalParser = (IntervalParser) this.left;
+ dateParser = this.right;
} else {
- return numericalOperation(sourceData, rowIndex, context);
+ intervalParser = (IntervalParser) this.right;
+ dateParser = this.left;
+ }
+ Object intervalPairObj = intervalParser.parse(sourceData, rowIndex,
context);
+ Object dateObj = dateParser.parse(sourceData, rowIndex, context);
+ if (intervalPairObj == null || dateObj == null) {
+ return null;
}
+ return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
+ (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1);
+ } else {
+ return numericalOperation(sourceData, rowIndex, context);
}
+}
```
## 添加单元测试代码
每个解析器类都需要通过单元测试来验证逻辑是否正确,单元测试类位于该[目录](https://github.com/apache/inlong/tree/master/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/parser)下。每个解析器的所有单元测试函数均放在同一个单元测试类中,单元测试类以
`Test + 解析器名 + Parser` 的格式进行命名,以 `TestAdditionParser` 为例:
```java
- @Test
- public void testAdditionParser() throws Exception {
- String transformSql = null;
- TransformConfig config = null;
- TransformProcessor<String, String> processor = null;
- List<String> output = null;
-
- transformSql = "select numeric1 + numeric2 from source";
- config = new TransformConfig(transformSql);
- processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- // case1: 1 + 10
- output = processor.transform("1|10||||", new HashMap<>());
- Assert.assertEquals(1, output.size());
- Assert.assertEquals("result=11", output.get(0));
- }
+@Test
+public void testAdditionParser() throws Exception {
+ String transformSql = null;
+ TransformConfig config = null;
+ TransformProcessor<String, String> processor = null;
+ List<String> output = null;
+
+ transformSql = "select numeric1 + numeric2 from source";
+ config = new TransformConfig(transformSql);
+ processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ // case1: 1 + 10
+ output = processor.transform("1|10||||", new HashMap<>());
+ Assert.assertEquals(1, output.size());
+ Assert.assertEquals("result=11", output.get(0));
+}
```
经过以上步骤,恭喜您完成了一个新解析器类的实现,可以向社区提交您的代码。`AdditionParser` 完整代码可见
[代码链接](https://github.com/apache/inlong/blob/master/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java)
@@ -204,10 +204,7 @@ public class AdditionParser implements ValueParser {
public class AndOperator implements ExpressionOperator {
@Override
- public boolean check(SourceData sourceData, int rowIndex, Context context)
{
-
- }
-
+ public boolean check(SourceData sourceData, int rowIndex, Context context)
{}
}
```
为逻辑操作符类添加对应的 `@TransformOperator` 注解,操作符类需要实现 `ExpressionOperator` 接口,重写该接口中的
`check` 方法。
@@ -241,43 +238,49 @@ public boolean check(SourceData sourceData, int rowIndex,
Context context) {
## 添加单元测试代码
每个逻辑操作符类都需要通过单元测试来验证逻辑是否正确,单元测试类位于该[目录](https://github.com/apache/inlong/tree/master/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator)下。每个逻辑操作符的所有单元测试函数均放在同一个单元测试类中,单元测试类以
`Test + 逻辑操作符名 + Operator` 的格式进行命名,以 `TestAndOperator` 为例:
```java
- public void testAndOperator() throws Exception {
- String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0)
from source";
- TransformConfig config = new TransformConfig(transformSql);
- // case1: "3.14159265358979323846|3a|4|4"
- TransformProcessor<String, String> processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|4");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output1.get(0), "result=0");
- // case2: "3.14159265358979323846|5|4|8"
- List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output2.get(0), "result=0");
- // case3: "3.14159265358979323846|3|4|8"
- List<String> output3 =
processor.transform("3.14159265358979323846|3|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output3.get(0), "result=1");
-
- transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from
source";
- config = new TransformConfig(transformSql);
- // case4: "3.14159265358979323846|4|4|8"
- processor = TransformProcessor
- .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
- SinkEncoderFactory.createKvEncoder(kvSink));
- List<String> output4 =
processor.transform("3.14159265358979323846|4|4|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output4.get(0), "result=0");
- // case5: "3.14159265358979323846|4|3.2|4"
- List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|4");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output5.get(0), "result=0");
- // case6: "3.14159265358979323846|4|3.2|8"
- List<String> output6 =
processor.transform("3.14159265358979323846|4|3.2|8");
- Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output6.get(0), "result=1");
- }
+public void testAndOperator() throws Exception {
+ String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0)
from source";
+ TransformConfig config = new TransformConfig(transformSql);
+
+ // case1: "3.14159265358979323846|3a|4|4"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 = processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+
+ // case3: "3.14159265358979323846|3|4|8"
+ List<String> output3 = processor.transform("3.14159265358979323846|3|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+
+ transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from
source";
+ config = new TransformConfig(transformSql);
+
+ // case4: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output4 = processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+
+ // case5: "3.14159265358979323846|4|3.2|4"
+ List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output5.get(0), "result=0");
+
+ // case6: "3.14159265358979323846|4|3.2|8"
+ List<String> output6 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output6.get(0), "result=1");
+}
```
经过以上步骤,恭喜您完成了一个新逻辑操作符类的实现,可以向社区提交您的代码。`AndOperator` 完整代码可见
[代码链接](https://github.com/apache/inlong/blob/master/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java)
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/how_to_build.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/how_to_build.md
index 4972f7b7056..54ac337eef2 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/how_to_build.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/development/how_to_build.md
@@ -9,11 +9,11 @@ sidebar_position: 1
- Java [JDK 8](https://adoptopenjdk.net/?variant=openjdk8)
- Maven 3.6.1+
-```
+```shell
$ mvn clean install -DskipTests
```
(可选) 使用docker编译:
-```
+```shell
$ docker pull maven:3.6-openjdk-8
$ docker run -v `pwd`:/inlong -w /inlong maven:3.6-openjdk-8 mvn clean
install -DskipTests
```
@@ -33,6 +33,6 @@ inlong-dashboard
## 编译Docker镜像
- [Docker](https://docs.docker.com/engine/install/) 19.03.1+
-```
+```shell
mvn clean package -DskipTests -Pdocker
```
\ No newline at end of file
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/cpp.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/cpp.md
index 4b7cfbcf406..97c14f507fc 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/cpp.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/cpp.md
@@ -20,19 +20,19 @@ import {siteVariables} from '../../version';
### 创建 SDK实例
SDK 支持进程创建1个SDK实例,多线程安全,也支持进程创建多个SDK实例,各个SDK实例相互独立,各个SDK实例也线程安全:
- 创建SDK实例对象
-```
+```cpp
InLongApi inlong_api
```
- 对象实例初始化 ,配置文件采用 json 格式,见[配置文件说明](#附录:配置文件说明)
-```
+```cpp
// 初始化SDK,参数为配置文件的路径名;返回值为零表示初始化成功
int32_t result = inlong_api.InitApi("/home/conf/config.json");
```
### 调用发送接口进行数据上报
SDK
支持单条(推荐)和批量发送,二者发送过程均为异步模式,数据上报接口是线程安全的。在进行数据上报前,可设置回调函数在数据发送失败时进行回调处理,回调函数签名如下:
-```
+```cpp
int32_t CallBackFunc(const char* inlong_group_id, const char* inlong_stream_id,
const char* msg, int32_t msg_len,
const int64_t report_time,
@@ -40,16 +40,16 @@ int32_t CallBackFunc(const char* inlong_group_id, const
char* inlong_stream_id,
```
- 单条数据数据上报接口
-```
+```cpp
// 返回值:零表示发送成功,非零表示失败,具体异常返回值详见tc_api.h中的SDKInvalidReuslt
- int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
- const char *msg, int32_t msg_len,
- UserCallBack call_back = nullptr)
+int32_t Send(const char *inlong_group_id, const char *inlong_stream_id,
+ const char *msg, int32_t msg_len,
+ UserCallBack call_back = nullptr)
```
### 关闭 SDK
调用 close 接口关闭 SDK:
-```
+```cpp
// 返回值为零表示关闭成功,后续无法再进行数据上报
// max_waitms:关闭SDK前的等待最大毫秒数,等待SDK内部数据发送完成
int32_t CloseApi(int32_t max_waitms);
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/go.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/go.md
index 5423db89650..731aad9250e 100755
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/go.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/go.md
@@ -22,7 +22,7 @@ import
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-go
```go
client, err := dataproxy.NewClient(
dataproxy.WithGroupID("test"),
-dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
+
dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
dataproxy.WithMetricsName("test"),
)
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md
index 6b2e5a55d35..d1ce0059b55 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md
@@ -12,14 +12,14 @@ import {siteVariables} from '../../version';
需要在项目中包含 SDK 的头文件和库,进行 SDK 的使用。头文件和库提供以下两种获取方式:
- 获取源码自行编译并将 SDK
包部署到本地仓库,见[如何编译](https://inlong.apache.org/zh-CN/docs/next/development/how_to_build);
- 直接引用 Apache 仓库里的已有库,见
-<pre><code parentName="pre">
-{`<dependency>
+
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>dataproxy-sdk</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## 数据上报流程
引入 SDK 后,通过实例化一个
[TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java)
接口对象后,调用相关的同步(sendMessage())或 异步(asyncSendMessage())接口来完成单条或多条(批量)数据的上报任务。发送
Demo 可参考
[TcpClientExample.java](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java)。
@@ -28,7 +28,7 @@ import {siteVariables} from '../../version';
### 初始化 SDK
从 Demo 示例代码我们可以看到,客户端初始化主要是在 `getMessageSender()` 函数中完成:
```java
- public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory,
boolean visitMgrByHttps,
+public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean
visitMgrByHttps,
String managerAddr, String
managerPort, String inlongGroupId, int msgType,
boolean useLocalMetaConfig, String
configBasePath) {
TcpMsgSender messageSender = null;
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/python.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/python.md
index 9a277b62412..2df7c9fab79 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/python.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/python.md
@@ -22,7 +22,7 @@ Your system's Python site-packages directory is: xxx/xxx
Enter the target directory for the .so files (Press Enter to use the default
site-packages directory):
```
构建过程完成后,您可以在 Python 项目中导入该软件包,使用 InLong DataProxy SDK 上报数据。
-```shell
+```python
import inlong_dataproxy
```
> **Note**: 当 C++ SDK 或您正在使用的 Python 版本更新时,您需要按照上述步骤重新构建它。
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/manager-sdk/example.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/manager-sdk/example.md
index da9bf8afa1b..45b0e00325f 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/manager-sdk/example.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/manager-sdk/example.md
@@ -11,15 +11,14 @@ Apache InLong Manager 是整个数据集成平台面向用户的统一管理入
## 依赖
-- 增加 maven 依赖
-<pre><code parentName="pre">
-{`<dependency>
+- 添加 maven 依赖
+```xml
+<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-client</artifactId>
<version>${siteVariables.inLongVersion}</version>
</dependency>
-`}
-</code></pre>
+```
## 代码
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/tubemq-sdk/cpp.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/tubemq-sdk/cpp.md
index d7d1019d139..99c1806b7c7 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/tubemq-sdk/cpp.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/tubemq-sdk/cpp.md
@@ -138,7 +138,6 @@ if (result) {
// stop the consumer
consumer.ShutDown();
-
```
### Example