This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec1740e3ca [INLONG-10093][SDK] Support to transform from CSV protocol
to CSV/KV protocol by single SQL (#10097)
ec1740e3ca is described below
commit ec1740e3ca269956b60024e098cba7a81c603be2
Author: 卢春亮 <[email protected]>
AuthorDate: Mon Apr 29 18:21:01 2024 +0800
[INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV
protocol by single SQL (#10097)
* [INLONG-10093][SDK] Support to transform from CSV protocol to CSV/KV
protocol by single SQL
---
inlong-sdk/pom.xml | 1 +
inlong-sdk/transform-sdk/pom.xml | 102 ++++
.../inlong/sdk/transform/decode/CsvSourceData.java | 60 +++
.../sdk/transform/decode/CsvSourceDecoder.java | 88 ++++
.../inlong/sdk/transform/decode/KvSourceData.java | 59 +++
.../sdk/transform/decode/KvSourceDecoder.java | 74 +++
.../inlong/sdk/transform/decode/KvUtils.java | 579 +++++++++++++++++++++
.../inlong/sdk/transform/decode/SourceData.java | 30 ++
.../inlong/sdk/transform/decode/SourceDecoder.java | 30 ++
.../inlong/sdk/transform/decode/SplitUtils.java | 164 ++++++
.../sdk/transform/encode/CsvSinkEncoder.java | 77 +++
.../sdk/transform/encode/DefaultSinkData.java | 60 +++
.../inlong/sdk/transform/encode/EscapeUtils.java | 83 +++
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 64 +++
.../inlong/sdk/transform/encode/SinkData.java | 33 ++
.../inlong/sdk/transform/encode/SinkEncoder.java | 26 +
.../inlong/sdk/transform/pojo/CsvSinkInfo.java | 99 ++++
.../inlong/sdk/transform/pojo/CsvSourceInfo.java | 99 ++++
.../inlong/sdk/transform/pojo/FieldInfo.java | 29 ++
.../inlong/sdk/transform/pojo/JsonSourceInfo.java | 56 ++
.../inlong/sdk/transform/pojo/KvSinkInfo.java | 58 +++
.../inlong/sdk/transform/pojo/KvSourceInfo.java | 59 +++
.../inlong/sdk/transform/pojo/PbSourceInfo.java | 78 +++
.../inlong/sdk/transform/pojo/ProtocolType.java | 61 +++
.../apache/inlong/sdk/transform/pojo/SinkInfo.java | 87 ++++
.../inlong/sdk/transform/pojo/SourceInfo.java | 69 +++
.../inlong/sdk/transform/pojo/TransformConfig.java | 96 ++++
.../sdk/transform/process/TransformProcessor.java | 160 ++++++
.../transform/process/operator/AndOperator.java | 49 ++
.../process/operator/EqualsToOperator.java | 51 ++
.../process/operator/ExpressionOperator.java | 28 +
.../operator/GreaterThanEqualsOperator.java | 53 ++
.../process/operator/GreaterThanOperator.java | 53 ++
.../process/operator/MinorThanEqualsOperator.java | 53 ++
.../process/operator/MinorThanOperator.java | 53 ++
.../process/operator/NotEqualsToOperator.java | 51 ++
.../transform/process/operator/NotOperator.java | 47 ++
.../transform/process/operator/OperatorTools.java | 81 +++
.../sdk/transform/process/operator/OrOperator.java | 49 ++
.../process/operator/ParenthesisOperator.java | 47 ++
.../sdk/transform/process/parser/ColumnParser.java | 47 ++
.../sdk/transform/process/parser/LongParser.java | 46 ++
.../sdk/transform/process/parser/StringParser.java | 46 ++
.../sdk/transform/process/parser/ValueParser.java | 29 ++
.../sdk/transform/pojo/TestTransformConfig.java | 113 ++++
.../transform/process/TestTransformProcessor.java | 128 +++++
46 files changed, 3505 insertions(+)
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index f9dd979680..11c51d1aec 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -33,6 +33,7 @@
<module>sdk-common</module>
<module>sort-sdk</module>
<module>dataproxy-sdk</module>
+ <module>transform-sdk</module>
</modules>
<properties>
diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
new file mode 100644
index 0000000000..aeb3e100b0
--- /dev/null
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-sdk</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>transform-sdk</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+
+ <name>Apache InLong - Transform SDK</name>
+
+ <properties>
+ <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sdk-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>3.1.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.2.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${plugin.compile.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${plugin.surefire.version}</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.0.2</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-install-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>${plugin.deploy.version}</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.7.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>3.0.0</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
new file mode 100644
index 0000000000..77f173f1a6
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CsvSourceData
+ *
+ */
+public class CsvSourceData implements SourceData {
+
+ private List<Map<String, String>> rows = new ArrayList<>();
+
+ private Map<String, String> currentRow;
+
+ public CsvSourceData() {
+ }
+
+ public void putField(String fieldName, String fieldValue) {
+ this.currentRow.put(fieldName, fieldValue);
+ }
+
+ public void addRow() {
+ this.currentRow = new HashMap<>();
+ rows.add(currentRow);
+ }
+
+ @Override
+ public int getRowCount() {
+ return this.rows.size();
+ }
+
+ @Override
+ public String getField(int rowNum, String fieldName) {
+ if (rowNum > this.rows.size()) {
+ return null;
+ }
+ Map<String, String> targetRow = this.rows.get(rowNum - 1);
+ return targetRow.get(fieldName);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
new file mode 100644
index 0000000000..22e3b9ba13
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CsvSourceDecoder
+ *
+ */
+public class CsvSourceDecoder implements SourceDecoder {
+
+ protected CsvSourceInfo sourceInfo;
+ private Charset srcCharset = Charset.defaultCharset();
+ private Character delimiter = '|';
+ private Character escapeChar = null;
+ private List<FieldInfo> fields;
+
+ public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ if (!StringUtils.isBlank(sourceInfo.getDelimiter())) {
+ this.delimiter = sourceInfo.getDelimiter().charAt(0);
+ }
+ if (!StringUtils.isBlank(sourceInfo.getEscapeChar())) {
+ this.escapeChar = sourceInfo.getEscapeChar().charAt(0);
+ }
+ if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+ this.srcCharset = Charset.forName(sourceInfo.getCharset());
+ }
+ this.fields = sourceInfo.getFields();
+ }
+
+ @Override
+ public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ String srcString = new String(srcBytes, srcCharset);
+ return this.decode(srcString, extParams);
+ }
+
+ @Override
+ public SourceData decode(String srcString, Map<String, Object> extParams) {
+ String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter,
escapeChar, '\"', '\n', true);
+ CsvSourceData sourceData = new CsvSourceData();
+ for (int i = 0; i < rowValues.length; i++) {
+ String[] fieldValues = rowValues[i];
+ sourceData.addRow();
+ if (fields == null || fields.size() == 0) {
+ for (int j = 0; j < fieldValues.length; j++) {
+ String fieldName = SourceData.FIELD_DEFAULT_PREFIX + (j +
1);
+ sourceData.putField(fieldName, fieldValues[i]);
+ }
+ continue;
+ }
+ int fieldIndex = 0;
+ for (FieldInfo field : fields) {
+ String fieldName = field.getName();
+ String fieldValue = null;
+ if (fieldIndex < fieldValues.length) {
+ fieldValue = fieldValues[fieldIndex];
+ }
+ sourceData.putField(fieldName, fieldValue);
+ fieldIndex++;
+ }
+ }
+ return sourceData;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
new file mode 100644
index 0000000000..3e3f600197
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KvSourceData
+ */
+public class KvSourceData implements SourceData {
+
+ private List<Map<String, String>> rows = new ArrayList<>();
+
+ private Map<String, String> currentRow;
+
+ public KvSourceData() {
+ }
+
+ public void putField(String fieldName, String fieldValue) {
+ this.currentRow.put(fieldName, fieldValue);
+ }
+
+ public void addRow() {
+ this.currentRow = new HashMap<>();
+ rows.add(currentRow);
+ }
+
+ @Override
+ public int getRowCount() {
+ return this.rows.size();
+ }
+
+ @Override
+ public String getField(int rowNum, String fieldName) {
+ if (rowNum > this.rows.size()) {
+ return null;
+ }
+ Map<String, String> targetRow = this.rows.get(rowNum - 1);
+ return targetRow.get(fieldName);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
new file mode 100644
index 0000000000..03b40c9f1c
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KvSourceDecoder
+ *
+ */
+public class KvSourceDecoder implements SourceDecoder {
+
+ protected KvSourceInfo sourceInfo;
+ private Charset srcCharset = Charset.defaultCharset();
+ private List<FieldInfo> fields;
+
+ public KvSourceDecoder(KvSourceInfo sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+ this.srcCharset = Charset.forName(sourceInfo.getCharset());
+ }
+ this.fields = sourceInfo.getFields();
+ }
+
+ @Override
+ public SourceData decode(byte[] srcBytes, Map<String, Object> extParams) {
+ String srcString = new String(srcBytes, srcCharset);
+ return this.decode(srcString, extParams);
+ }
+
+ @Override
+ public SourceData decode(String srcString, Map<String, Object> extParams) {
+ List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, '&',
'=', '\\', '\"', '\n');
+ KvSourceData sourceData = new KvSourceData();
+ if (fields == null || fields.size() == 0) {
+ for (Map<String, String> row : rowValues) {
+ sourceData.addRow();
+ row.forEach((k, v) -> sourceData.putField(k, v));
+ }
+ return sourceData;
+ }
+ for (Map<String, String> row : rowValues) {
+ sourceData.addRow();
+ for (FieldInfo field : fields) {
+ String fieldName = field.getName();
+ String fieldValue = row.get(fieldName);
+ sourceData.putField(fieldName, fieldValue);
+ }
+ }
+ return sourceData;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
new file mode 100644
index 0000000000..60c804c48f
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvUtils.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The utilities for strings.
+ */
+public class KvUtils {
+
+ private static final int STATE_NORMAL = 0;
+ private static final int STATE_KEY = 2;
+ private static final int STATE_VALUE = 4;
+ private static final int STATE_ESCAPING = 8;
+ private static final int STATE_QUOTING = 16;
+
+ /**
+ * Splits the kv text.
+ *
+ * <p>Both escaping and quoting is supported. When the escape character is
+ * not '\0', then the next character to the escape character will be
+ * escaped. When the quote character is not '\0', then all characters
+ * between consecutive quote characters will be escaped.</p>
+ *
+ * @param text The text to be split.
+ * @param entryDelimiter The delimiter of entries.
+ * @param kvDelimiter The delimiter between key and value.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @return The fields split from the text.
+ */
+ public static Map<String, String> splitKv(
+ @Nonnull String text,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar) {
+ List<Map<String, String>> lines = splitKv(text, entryDelimiter,
kvDelimiter, escapeChar, quoteChar, null);
+ if (lines.size() == 0) {
+ return new HashMap<>();
+ }
+ return lines.get(0);
+ }
+
+ /**
+ * Splits the kv text.
+ *
+ * <p>Both escaping and quoting is supported. When the escape character is
+ * not '\0', then the next character to the escape character will be
+ * escaped. When the quote character is not '\0', then all characters
+ * between consecutive quote characters will be escaped.</p>
+ *
+ * @param text The text to be split.
+ * @param entryDelimiter The delimiter of entries.
+ * @param kvDelimiter The delimiter between key and value.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @param lineDelimiter The line delimiter character.
+ * @return The fields split from the text.
+ */
+ public static List<Map<String, String>> splitKv(
+ @Nonnull String text,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable Character lineDelimiter) {
+ Map<String, String> fields = new HashMap<>();
+ List<Map<String, String>> lines = new ArrayList<>();
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ String key = "";
+ String value;
+
+ int state = STATE_KEY;
+
+ /*
+ * The state when entering escaping and quoting. When we exit escaping
or quoting, we should restore this state.
+ */
+ int kvState = STATE_KEY;
+
+ for (int i = 0; i < text.length(); ++i) {
+ char ch = text.charAt(i);
+
+ if (ch == kvDelimiter) {
+ switch (state) {
+ case STATE_KEY:
+ key = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ state = STATE_VALUE;
+ break;
+ case STATE_VALUE:
+ // throw new IllegalArgumentException("Unexpected
token " +
+ // ch + " at position " + i + ".");
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else if (ch == entryDelimiter) {
+ switch (state) {
+ case STATE_KEY:
+ // throw new IllegalArgumentException("Unexpected
token " +
+ // ch + " at position " + i + ".");
+ key = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ fields.put(key, "");
+ state = STATE_KEY;
+ break;
+ case STATE_VALUE:
+ value = stringBuilder.toString();
+ fields.put(key, value);
+
+ stringBuilder.setLength(0);
+ state = STATE_KEY;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else if (escapeChar != null && ch == escapeChar) {
+ switch (state) {
+ case STATE_KEY:
+ case STATE_VALUE:
+ kvState = state;
+ state = STATE_ESCAPING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else if (quoteChar != null && ch == quoteChar) {
+ switch (state) {
+ case STATE_KEY:
+ case STATE_VALUE:
+ kvState = state;
+ state = STATE_QUOTING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ state = kvState;
+ break;
+ default:
+ break;
+ }
+ } else if (lineDelimiter != null && ch == lineDelimiter) {
+ switch (state) {
+ case STATE_VALUE:
+ value = stringBuilder.toString();
+ fields.put(key, value);
+ Map<String, String> copyFields = new HashMap<>();
+ copyFields.putAll(fields);
+ lines.add(copyFields);
+ stringBuilder.setLength(0);
+ fields.clear();
+ state = STATE_KEY;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else {
+ stringBuilder.append(ch);
+ }
+ }
+
+ switch (state) {
+ case STATE_KEY:
+ // throw new IllegalArgumentException("Dangling key.");
+ key = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ fields.put(key, "");
+ lines.add(fields);
+ return lines;
+ case STATE_VALUE:
+ value = stringBuilder.toString();
+ fields.put(key, value);
+ lines.add(fields);
+ return lines;
+ case STATE_ESCAPING:
+ // throw new IllegalArgumentException("Not closed escaping.");
+ case STATE_QUOTING:
+ // throw new IllegalArgumentException("Not closed quoting.");
+ default:
+ // throw new IllegalStateException();
+ if (kvState == STATE_VALUE) {
+ key = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ fields.put(key, "");
+ lines.add(fields);
+ return lines;
+ // } else if (kvState == STATE_KEY) {
+ } else {
+ value = stringBuilder.toString();
+ fields.put(key, value);
+ lines.add(fields);
+ return lines;
+ }
+ }
+ }
+
+ /**
+ * Concat the given fields' keys and values.
+ *
+ * <p>Special characters in the text will be escaped or quoted if
+ * corresponding character is given. Otherwise, an exception will be
+ * thrown.</p>
+ *
+ * @param fieldKeys The keys to be concat.
+ * @param fieldValues The values to be concat.
+ * @param entryDelimiter The delimiter of entries.
+ * @param kvDelimiter The delimiter between key and value.
+ * @param escapeChar The escape character.
+ * @param quoteChar The quote character.
+ * @return The concated text of given fields.
+ */
+ public static String concatKv(
+ @Nonnull String[] fieldKeys,
+ @Nonnull String[] fieldValues,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar) {
+ if (fieldKeys.length != fieldValues.length) {
+ throw new IllegalArgumentException("The keys' number " +
fieldKeys.length
+ + " doesn't match values' number " + fieldValues.length);
+ }
+
+ Collection<Character> delimiters = Arrays.asList(entryDelimiter,
kvDelimiter);
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (int index = 0; index < fieldKeys.length; ++index) {
+
+ encodeText(
+ stringBuilder,
+ fieldKeys[index],
+ delimiters,
+ escapeChar,
+ quoteChar);
+
+ stringBuilder.append(kvDelimiter);
+
+ encodeText(
+ stringBuilder,
+ fieldValues[index],
+ delimiters,
+ escapeChar,
+ quoteChar);
+
+ if (index < fieldKeys.length - 1) {
+ stringBuilder.append(entryDelimiter);
+ }
+ }
+
+ return stringBuilder.toString();
+ }
+
+ private static void encodeText(
+ StringBuilder stringBuilder,
+ String text,
+ Collection<Character> delimiters,
+ Character escapeChar,
+ Character quoteChar) {
+ for (int i = 0; i < text.length(); ++i) {
+ char ch = text.charAt(i);
+
+ if (delimiters.contains(ch)) {
+ if (escapeChar != null) {
+ stringBuilder.append(escapeChar);
+ stringBuilder.append(ch);
+ } else if (quoteChar != null) {
+ stringBuilder.append(quoteChar);
+ stringBuilder.append(ch);
+ stringBuilder.append(quoteChar);
+ } else {
+ throw new IllegalArgumentException("There is a delimiter
in the text, "
+ + "but neither escape nor quote character is
specified.");
+ }
+ } else if (escapeChar != null && ch == escapeChar) {
+ stringBuilder.append(escapeChar);
+ stringBuilder.append(ch);
+ } else if (quoteChar != null && ch == quoteChar) {
+ if (escapeChar != null) {
+ stringBuilder.append(escapeChar);
+ stringBuilder.append(ch);
+ } else {
+ throw new IllegalArgumentException("There is a quote
character in the text, "
+ + "but escape character is not specified.");
+ }
+ } else {
+ stringBuilder.append(ch);
+ }
+ }
+ }
+
+ /**
+ * Splits a single line of csv text.
+ *
+ * @see KvUtils#splitCsv(String, Character, Character, Character,
Character, boolean)
+ *
+ * @param text The text to be split.
+ * @param delimiter The delimiter of fields.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @return The split array content.
+ */
+ public static String[] splitCsv(
+ @Nonnull String text,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar) {
+ String[][] splitResult = splitCsv(text, delimiter, escapeChar,
quoteChar, null);
+ if (splitResult.length == 0) {
+ return new String[0];
+ }
+ return splitResult[0];
+ }
+
+ /**
+ * @see KvUtils#splitCsv(String, Character, Character, Character,
Character, boolean)
+ *
+ * @param text The text to be split.
+ * @param delimiter The delimiter of fields.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @param lineDelimiter The delimiter between lines, e.g. '\n'.
+ * @return The split value.
+ */
+ public static String[][] splitCsv(
+ @Nonnull String text,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable Character lineDelimiter) {
+ return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
false);
+ }
+
+ /**
+ * Splits the csv text, which may contains multiple lines of data.
+ *
+ * <p>Both escaping and quoting is supported. When the escape character is
+ * not '\0', then the next character to the escape character will be
+ * escaped. When the quote character is not '\0', then all characters
+ * between consecutive quote characters will be escaped.</p>
+ *
+ * @param text The text to be split.
+ * @param delimiter The delimiter of fields.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @param lineDelimiter The delimiter between lines, e.g. '\n'.
+ * @param deleteHeadDelimiter If true and the leading character of a line
+ * is a delimiter, it will be ignored.
+ * @return A 2-D String array representing the parsed data, where the 1st
+ * dimension is row and the 2nd dimension is column.
+ */
+ public static String[][] splitCsv(
+ @Nonnull String text,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable Character lineDelimiter,
+ boolean deleteHeadDelimiter) {
+ List<String[]> lines = new ArrayList<>();
+ List<String> fields = new ArrayList<>();
+
+ StringBuilder stringBuilder = new StringBuilder();
+ int state = STATE_NORMAL;
+
+ for (int i = 0; i < text.length(); ++i) {
+ char ch = text.charAt(i);
+
+ if (ch == delimiter) {
+ switch (state) {
+ case STATE_NORMAL:
+ if (deleteHeadDelimiter && fields.isEmpty()
+ && stringBuilder.length() == 0) {
+ break;
+ }
+ String field = stringBuilder.toString();
+ fields.add(field);
+ stringBuilder.setLength(0);
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else if (escapeChar != null && ch == escapeChar) {
+ switch (state) {
+ case STATE_NORMAL:
+ state = STATE_ESCAPING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else if (quoteChar != null && ch == quoteChar) {
+ switch (state) {
+ case STATE_NORMAL:
+ state = STATE_QUOTING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ state = STATE_NORMAL;
+ break;
+ default:
+ break;
+ }
+ } else if (lineDelimiter != null && ch == lineDelimiter) {
+ switch (state) {
+ case STATE_NORMAL:
+ String field = stringBuilder.toString();
+ fields.add(field);
+ lines.add(fields.toArray(new String[0]));
+
+ stringBuilder.setLength(0);
+ fields.clear();
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ default:
+ break;
+ }
+ } else {
+ if (state == STATE_ESCAPING) {
+ state = STATE_NORMAL;
+ }
+ stringBuilder.append(ch);
+ }
+ }
+
+ switch (state) {
+ case STATE_NORMAL:
+ String field = stringBuilder.toString();
+ fields.add(field);
+ lines.add(fields.toArray(new String[0]));
+
+ String[][] result = new String[lines.size()][];
+ for (int i = 0; i < lines.size(); ++i) {
+ result[i] = lines.get(i);
+ }
+ return result;
+
+ case STATE_ESCAPING:
+ throw new IllegalArgumentException(String.format("Not closed
escaping. Text=[%s].", text));
+ case STATE_QUOTING:
+ throw new IllegalArgumentException(String.format("Not closed
quoting. Text=[%s].", text));
+ default:
+ throw new IllegalStateException(String.format("Text=[%s].",
text));
+ }
+ }
+
+ /**
+ * Concat the given fields.
+ *
+ * <p>Special characters in the text will be escaped or quoted if
+ * corresponding character is given. Otherwise, an exception will be
+ * thrown.</p>
+ *
+ * @param fields The fields to be concat.
+ * @param delimiter The delimiter of fields.
+ * @param escapeChar The escape character.
+ * @param quoteChar The quote character.
+ * @return The concated text of given fields.
+ */
+ public static String concatCsv(
+ @Nonnull String[] fields,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (int index = 0; index < fields.length; ++index) {
+
+ String field = fields[index];
+
+ for (int i = 0; i < field.length(); ++i) {
+ char ch = field.charAt(i);
+
+ if (ch == delimiter
+ || (escapeChar != null && ch == escapeChar)
+ || (quoteChar != null && ch == quoteChar)) {
+
+ if (escapeChar != null) {
+ stringBuilder.append(escapeChar);
+ stringBuilder.append(ch);
+ } else if (quoteChar != null && ch != quoteChar) {
+ stringBuilder.append(quoteChar);
+ stringBuilder.append(ch);
+ stringBuilder.append(quoteChar);
+ } else {
+ throw new IllegalArgumentException("There exist
special characters in the text, "
+ + "but neither escape character nor quote
character is configured.");
+ }
+ } else {
+ stringBuilder.append(ch);
+ }
+ }
+
+ if (index < fields.length - 1) {
+ stringBuilder.append(delimiter);
+ }
+ }
+
+ return stringBuilder.toString();
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
new file mode 100644
index 0000000000..2c39948f2d
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+/**
+ * SourceData
+ */
+public interface SourceData {
+
+ String FIELD_DEFAULT_PREFIX = "$";
+
+ int getRowCount();
+
+ String getField(int rowNum, String fieldName);
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
new file mode 100644
index 0000000000..a11cd89351
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import java.util.Map;
+
+/**
+ * SourceDecoder
+ */
+public interface SourceDecoder {
+
+ SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+
+ SourceData decode(String srcString, Map<String, Object> extParams);
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
new file mode 100644
index 0000000000..f29af8f93a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SplitUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SplitUtils
+ *
+ */
+public class SplitUtils {
+
+ public static final int STATE_NORMAL = 0;
+ public static final int STATE_KEY = 2;
+ public static final int STATE_VALUE = 4;
+ public static final int STATE_ESCAPING = 8;
+ public static final int STATE_QUOTING = 16;
+
+ public static String[] splitCsv(@Nonnull String text, @Nonnull Character
delimiter, @Nullable Character escapeChar,
+ @Nullable Character quoteChar, boolean hasEscapeProcess) {
+ String[][] splitResult = splitCsv(text, delimiter, escapeChar,
quoteChar, null, hasEscapeProcess);
+ if (splitResult.length == 0) {
+ return new String[0];
+ }
+ return splitResult[0];
+ }
+
+ public static String[][] splitCsv(@Nonnull String text, @Nonnull Character
delimiter,
+ @Nullable Character escapeChar, @Nullable Character quoteChar,
@Nullable Character lineDelimiter,
+ boolean hasEscapeProcess) {
+ return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
false, hasEscapeProcess);
+ }
+
+ public static String[][] splitCsv(@Nonnull String text, @Nonnull Character
delimiter,
+ @Nullable Character escapeChar, @Nullable Character quoteChar,
@Nullable Character lineDelimiter,
+ boolean deleteHeadDelimiter, boolean hasEscapeProcess) {
+ char deli = delimiter.charValue();
+ char escape = (escapeChar == null) ? '\\' : escapeChar.charValue();
+ char quote = (quoteChar == null) ? '\"' : quoteChar.charValue();
+ char line = (lineDelimiter == null) ? '\n' : lineDelimiter.charValue();
+ List<String[]> lines = new ArrayList<>();
+ List<String> fields = new ArrayList<>();
+
+ int state = STATE_NORMAL;
+
+ char[] srcValue = text.toCharArray();
+ char[] fieldValue = new char[srcValue.length];
+ int fieldIndex = 0;
+ for (int i = 0; i < text.length(); i++) {
+ char ch = srcValue[i];
+
+ if (ch == deli) {
+ switch (state) {
+ case STATE_NORMAL:
+ if (fieldIndex == 0 && deleteHeadDelimiter &&
fields.isEmpty()) {
+ break;
+ }
+ fields.add(new String(fieldValue, 0, fieldIndex));
+ fieldIndex = 0;
+ break;
+ case STATE_ESCAPING:
+ fieldValue[fieldIndex++] = ch;
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ fieldValue[fieldIndex++] = ch;
+ break;
+ default:
+ break;
+ }
+ } else if (escapeChar != null && ch == escape) {
+ switch (state) {
+ case STATE_NORMAL:
+ state = STATE_ESCAPING;
+ break;
+ case STATE_ESCAPING:
+ if (!hasEscapeProcess) {
+ fieldValue[fieldIndex++] = escapeChar;
+ }
+ fieldValue[fieldIndex++] = ch;
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ fieldValue[fieldIndex++] = ch;
+ break;
+ default:
+ break;
+ }
+ } else if (quoteChar != null && ch == quote) {
+ switch (state) {
+ case STATE_NORMAL:
+ state = STATE_QUOTING;
+ break;
+ case STATE_ESCAPING:
+ if (!hasEscapeProcess) {
+ fieldValue[fieldIndex++] = escapeChar;
+ }
+ fieldValue[fieldIndex++] = ch;
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ state = STATE_NORMAL;
+ break;
+ default:
+ break;
+ }
+ } else if (lineDelimiter != null && ch == line) {
+ switch (state) {
+ case STATE_NORMAL:
+ fields.add(new String(fieldValue, 0, fieldIndex));
+ fieldIndex = 0;
+ lines.add(fields.toArray(new String[0]));
+ fields.clear();
+ break;
+ case STATE_ESCAPING:
+ fieldValue[fieldIndex++] = ch;
+ state = STATE_NORMAL;
+ break;
+ case STATE_QUOTING:
+ fieldValue[fieldIndex++] = ch;
+ break;
+ default:
+ break;
+ }
+ } else {
+ if (state == STATE_ESCAPING) {
+ if (!hasEscapeProcess) {
+ fieldValue[fieldIndex++] = escapeChar;
+ }
+ state = STATE_NORMAL;
+ }
+ fieldValue[fieldIndex++] = ch;
+ }
+ }
+ fields.add(new String(fieldValue, 0, fieldIndex));
+ fieldIndex = 0;
+ lines.add(fields.toArray(new String[0]));
+
+ String[][] result = new String[lines.size()][];
+ for (int i = 0; i < lines.size(); i++) {
+ result[i] = lines.get(i);
+ }
+ return result;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
new file mode 100644
index 0000000000..cb3c9405a0
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * CsvSinkEncoder
+ */
+public class CsvSinkEncoder implements SinkEncoder {
+
+ protected CsvSinkInfo sinkInfo;
+ protected Charset sinkCharset = Charset.defaultCharset();
+ private Character delimiter = '|';
+ private Character escapeChar = null;
+ private List<FieldInfo> fields;
+ private StringBuilder builder = new StringBuilder();
+
+ public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
+ this.sinkInfo = sinkInfo;
+ if (!StringUtils.isBlank(sinkInfo.getDelimiter())) {
+ this.delimiter = sinkInfo.getDelimiter().charAt(0);
+ }
+ if (!StringUtils.isBlank(sinkInfo.getEscapeChar())) {
+ this.escapeChar = sinkInfo.getEscapeChar().charAt(0);
+ }
+ if (!StringUtils.isBlank(sinkInfo.getCharset())) {
+ this.sinkCharset = Charset.forName(sinkInfo.getCharset());
+ }
+ this.fields = sinkInfo.getFields();
+ }
+
+ /**
+ * encode
+ * @param sinkData
+ * @return
+ */
+ @Override
+ public String encode(SinkData sinkData) {
+ if (fields == null || fields.size() == 0) {
+ return "";
+ }
+ builder.delete(0, builder.length());
+ if (escapeChar == null) {
+ fields.forEach(v ->
builder.append(sinkData.getField(v.getName())).append(delimiter));
+ } else {
+ for (FieldInfo field : fields) {
+ String fieldName = field.getName();
+ String fieldValue = sinkData.getField(fieldName);
+ EscapeUtils.escapeContent(builder, delimiter, escapeChar,
fieldValue);
+ builder.append(delimiter);
+ }
+ }
+ return builder.substring(0, builder.length() - 1);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
new file mode 100644
index 0000000000..2e1c3bea4b
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * DefaultSinkData
+ *
+ */
+public class DefaultSinkData implements SinkData {
+
+ private Map<String, String> currentRow = new HashMap<>();
+
+ /**
+ * putField
+ * @param fieldName
+ * @param fieldValue
+ */
+ @Override
+ public void putField(String fieldName, String fieldValue) {
+ this.currentRow.put(fieldName, fieldValue);
+ }
+
+ /**
+ * getField
+ * @param fieldName
+ * @return
+ */
+ @Override
+ public String getField(String fieldName) {
+ return this.currentRow.getOrDefault(fieldName, "");
+ }
+
+ /**
+ * keySet
+ * @return
+ */
+ @Override
+ public Set<String> keySet() {
+ return this.currentRow.keySet();
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
new file mode 100644
index 0000000000..2ba3c429d1
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/EscapeUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.List;
+
+/**
+ * EscapeUtils
+ *
+ */
+public class EscapeUtils {
+
+ public static String escapeStringSeparator(String fieldValue, char
separator) {
+ StringBuilder builder = new StringBuilder();
+ escapeContent(builder, separator, '\\', fieldValue);
+ String formatField = builder.toString();
+ return formatField;
+ }
+
+ public static String escapeFields(List<String> fields, char separator) {
+ if (fields.size() <= 0) {
+ return "";
+ }
+ StringBuilder ss = new StringBuilder();
+ for (String field : fields) {
+ String fmtField = escapeStringSeparator(field, separator);
+ ss.append(fmtField).append(separator);
+ }
+ String result = ss.substring(0, ss.length() - 1);
+ return result;
+ }
+
+ public static void escapeContent(StringBuilder builder, char separator,
char escapeChar, Object field) {
+ String strField = "";
+ if (field != null) {
+ strField = String.valueOf(field);
+ }
+ int length = strField.length();
+
+ for (int i = 0; i < length; i++) {
+ putValueIntoStringBuilder(builder, separator, escapeChar,
strField.charAt(i));
+ }
+ }
+
+ public static void putValueIntoStringBuilder(StringBuilder builder, char
separator, final char escapeChar,
+ char value) {
+ switch (value) {
+ case 0:
+ builder.append(escapeChar).append('0');
+ break;
+ case '\n':
+ builder.append(escapeChar).append('n');
+ break;
+ case '\r':
+ builder.append(escapeChar).append('r');
+ break;
+ default:
+ if (value == separator) {
+ builder.append(escapeChar).append(separator);
+ } else if (value == escapeChar) {
+ builder.append(escapeChar).append(escapeChar);
+ } else {
+ builder.append(value);
+ }
+ break;
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
new file mode 100644
index 0000000000..3fcc31107e
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * KvSinkEncoder
+ */
+public class KvSinkEncoder implements SinkEncoder {
+
+ protected KvSinkInfo sinkInfo;
+ protected Charset sinkCharset = Charset.defaultCharset();
+ private List<FieldInfo> fields;
+ private StringBuilder builder = new StringBuilder();
+
+ public KvSinkEncoder(KvSinkInfo sinkInfo) {
+ this.sinkInfo = sinkInfo;
+ if (!StringUtils.isBlank(sinkInfo.getCharset())) {
+ this.sinkCharset = Charset.forName(sinkInfo.getCharset());
+ }
+ this.fields = sinkInfo.getFields();
+ }
+
+ /**
+ * encode
+ * @param sinkData
+ * @return
+ */
+ @Override
+ public String encode(SinkData sinkData) {
+ if (fields == null || fields.size() == 0) {
+ return "";
+ }
+ builder.delete(0, builder.length());
+ for (FieldInfo field : fields) {
+ String fieldName = field.getName();
+ String fieldValue = sinkData.getField(fieldName);
+
builder.append(fieldName).append('=').append(fieldValue).append('&');
+ }
+ return builder.substring(0, builder.length() - 1);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
new file mode 100644
index 0000000000..037df2dfcf
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import java.util.Set;
+
+/**
+ * SinkData
+ *
+ */
+public interface SinkData {
+
+ void putField(String fieldName, String fieldValue);
+
+ String getField(String fieldName);
+
+ Set<String> keySet();
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
new file mode 100644
index 0000000000..ab83d21a0b
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+/**
+ * SinkEncoder
+ */
+public interface SinkEncoder {
+
+ String encode(SinkData sinkData);
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
new file mode 100644
index 0000000000..88dd5bf36c
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSinkInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * CsvSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CsvSinkInfo extends SinkInfo {
+
+ private String delimiter;
+ private String escapeChar;
+ private List<FieldInfo> fields;
+
+ @JsonCreator
+ public CsvSinkInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("delimiter") String delimiter,
+ @JsonProperty("escapeChar") String escapeChar,
+ @JsonProperty("fields") List<FieldInfo> fields) {
+ super(SourceInfo.CSV, charset);
+ this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
+ this.fields = fields;
+ }
+
+ /**
+ * get delimiter
+ * @return the delimiter
+ */
+ @JsonProperty("delimiter")
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * set delimiter
+ * @param delimiter the delimiter to set
+ */
+ public void setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * get escapeChar
+ * @return the escapeChar
+ */
+ @JsonProperty("escapeChar")
+ public String getEscapeChar() {
+ return escapeChar;
+ }
+
+ /**
+ * set escapeChar
+ * @param escapeChar the escapeChar to set
+ */
+ public void setEscapeChar(String escapeChar) {
+ this.escapeChar = escapeChar;
+ }
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ @JsonProperty("fields")
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
+
+ /**
+ * set fields
+ * @param fields the fields to set
+ */
+ public void setFields(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
new file mode 100644
index 0000000000..27a46bcb19
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/CsvSourceInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * CsvSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CsvSourceInfo extends SourceInfo {
+
+ private String delimiter;
+ private String escapeChar;
+ private List<FieldInfo> fields;
+
+ @JsonCreator
+ public CsvSourceInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("delimiter") String delimiter,
+ @JsonProperty("escapeChar") String escapeChar,
+ @JsonProperty("fields") List<FieldInfo> fields) {
+ super(charset);
+ this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
+ this.fields = fields;
+ }
+
+ /**
+ * get delimiter
+ * @return the delimiter
+ */
+ @JsonProperty("delimiter")
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * set delimiter
+ * @param delimiter the delimiter to set
+ */
+ public void setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * get escapeChar
+ * @return the escapeChar
+ */
+ @JsonProperty("escapeChar")
+ public String getEscapeChar() {
+ return escapeChar;
+ }
+
+ /**
+ * set escapeChar
+ * @param escapeChar the escapeChar to set
+ */
+ public void setEscapeChar(String escapeChar) {
+ this.escapeChar = escapeChar;
+ }
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ @JsonProperty("fields")
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
+
+ /**
+ * set fields
+ * @param fields the fields to set
+ */
+ public void setFields(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
new file mode 100644
index 0000000000..46106e534f
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import lombok.Data;
+
+/**
+ * FieldInfo
+ */
+@Data
+public class FieldInfo {
+
+ private String name;
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
new file mode 100644
index 0000000000..9935623d06
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/JsonSourceInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * JsonSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JsonSourceInfo extends SourceInfo {
+
+ private String rowsNodePath;
+
+ @JsonCreator
+ public JsonSourceInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("rowsNodePath") String rowsNodePath) {
+ super(charset);
+ this.rowsNodePath = rowsNodePath;
+ }
+
+ /**
+ * get rowsNodePath
+ * @return the rowsNodePath
+ */
+ @JsonProperty("rowsNodePath")
+ public String getRowsNodePath() {
+ return rowsNodePath;
+ }
+
+ /**
+ * set rowsNodePath
+ * @param rowsNodePath the rowsNodePath to set
+ */
+ public void setRowsNodePath(String rowsNodePath) {
+ this.rowsNodePath = rowsNodePath;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
new file mode 100644
index 0000000000..11c3550f42
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSinkInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * KvSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KvSinkInfo extends SinkInfo {
+
+ private List<FieldInfo> fields;
+
+ @JsonCreator
+ public KvSinkInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("fields") List<FieldInfo> fields) {
+ super(SourceInfo.KV, charset);
+ this.fields = fields;
+ }
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ @JsonProperty("fields")
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
+
+ /**
+ * set fields
+ * @param fields the fields to set
+ */
+ public void setFields(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
new file mode 100644
index 0000000000..6d92d44920
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/KvSourceInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * KvSourceInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KvSourceInfo extends SourceInfo {
+
+ private List<FieldInfo> fields;
+
+ @JsonCreator
+ public KvSourceInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("fields") List<FieldInfo> fields) {
+ super(charset);
+ this.fields = fields;
+ }
+
+ /**
+ * get fields
+ * @return the fields
+ */
+ @JsonProperty("fields")
+ public List<FieldInfo> getFields() {
+ return fields;
+ }
+
+ /**
+ * set fields
+ * @param fields the fields to set
+ */
+ public void setFields(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
new file mode 100644
index 0000000000..ba226bd807
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * PbSourceInfo
+ *
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PbSourceInfo extends SourceInfo {
+
+ private String protoDefine;
+ private String rowsNodePath;
+
+ @JsonCreator
+ public PbSourceInfo(
+ @JsonProperty("charset") String charset,
+ @JsonProperty("protoDefine") String protoDefine,
+ @JsonProperty("rowsNodePath") String rowsNodePath) {
+ super(charset);
+ this.protoDefine = protoDefine;
+ this.rowsNodePath = rowsNodePath;
+ }
+
+ /**
+ * get protoDefine
+ * @return the protoDefine
+ */
+ @JsonProperty("protoDefine")
+ public String getProtoDefine() {
+ return protoDefine;
+ }
+
+ /**
+ * set protoDefine
+ * @param protoDefine the protoDefine to set
+ */
+ public void setProtoDefine(String protoDefine) {
+ this.protoDefine = protoDefine;
+ }
+
+ /**
+ * get rowsNodePath
+ * @return the rowsNodePath
+ */
+ @JsonProperty("rowsNodePath")
+ public String getRowsNodePath() {
+ return rowsNodePath;
+ }
+
+ /**
+ * set rowsNodePath
+ * @param rowsNodePath the rowsNodePath to set
+ */
+ public void setRowsNodePath(String rowsNodePath) {
+ this.rowsNodePath = rowsNodePath;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
new file mode 100644
index 0000000000..13e9e2a179
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ProtocolType.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import java.util.Locale;
+
+/**
+ * ProtocolType
+ */
+public enum ProtocolType {
+
+ CSV("csv"), KV("kv"), PB("pb"), JSON("json"), UNKNOWN("n");
+
+ private final String type;
+
+ ProtocolType(String type) {
+ this.type = type;
+ }
+
+ public static ProtocolType forType(String type) {
+ for (ProtocolType dataType : values()) {
+ if (dataType.getType().equals(type.toLowerCase(Locale.ROOT))) {
+ return dataType;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported protocol type for " +
type);
+ }
+
+ public static ProtocolType convert(String value) {
+ for (ProtocolType v : values()) {
+ if (v.getType().equals(value.toLowerCase(Locale.ROOT))) {
+ return v;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ @Override
+ public String toString() {
+ return this.name() + ":" + this.type;
+ }
+
+ public String getType() {
+ return type;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
new file mode 100644
index 0000000000..9bd4647479
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * SinkInfo
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
+@JsonSubTypes({
+ @Type(value = CsvSinkInfo.class, name = SourceInfo.CSV),
+ @Type(value = KvSinkInfo.class, name = SourceInfo.KV),
+})
+public abstract class SinkInfo {
+
+ @JsonIgnore
+ private String type;
+
+ @JsonProperty("charset")
+ private String charset;
+
+ public SinkInfo(
+ String type,
+ @JsonProperty("charset") String charset) {
+ this.type = checkNotNull(type);
+ this.charset = Optional.ofNullable(charset).orElse("UTF-8");
+ }
+
+ /**
+ * get type
+ * @return the type
+ */
+ @JsonIgnore
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * set type
+ * @param type the type to set
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * get charset
+ * @return the charset
+ */
+ @JsonProperty("charset")
+ public String getCharset() {
+ return charset;
+ }
+
+ /**
+ * set charset
+ * @param charset the charset to set
+ */
+ public void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
new file mode 100644
index 0000000000..a02b17f78a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SourceInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Optional;
+
+/**
+ * SourceInfo
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
+@JsonSubTypes({
+ @Type(value = CsvSourceInfo.class, name = SourceInfo.CSV),
+ @Type(value = KvSourceInfo.class, name = SourceInfo.KV),
+ @Type(value = PbSourceInfo.class, name = SourceInfo.PB),
+ @Type(value = JsonSourceInfo.class, name = SourceInfo.JSON),
+})
+public abstract class SourceInfo {
+
+ public static final String NODE_PATH_SEPARTOR = ".";
+
+ public static final String CSV = "csv";
+ public static final String KV = "kv";
+ public static final String PB = "pb";
+ public static final String JSON = "json";
+
+ private String charset;
+
+ public SourceInfo(
+ @JsonProperty("charset") String charset) {
+ this.charset = Optional.ofNullable(charset).orElse("UTF-8");
+ }
+
+ /**
+ * get charset
+ * @return the charset
+ */
+ public String getCharset() {
+ return charset;
+ }
+
+ /**
+ * set charset
+ * @param charset the charset to set
+ */
+ public void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
new file mode 100644
index 0000000000..ff1ac958fc
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * TransformConfig
+ */
+public class TransformConfig {
+
+ @JsonProperty("sourceInfo")
+ private SourceInfo sourceInfo;
+ @JsonProperty("sinkInfo")
+ private SinkInfo sinkInfo;
+ @JsonProperty("transformSql")
+ private String transformSql;
+
+ @JsonCreator
+ public TransformConfig(
+ @JsonProperty("sourceInfo") SourceInfo sourceInfo,
+ @JsonProperty("sinkInfo") SinkInfo sinkInfo,
+ @JsonProperty("transformSql") String transformSql) {
+ this.sourceInfo = sourceInfo;
+ this.sinkInfo = sinkInfo;
+ this.transformSql = transformSql;
+ }
+
+ /**
+ * get sourceInfo
+ * @return the sourceInfo
+ */
+ @JsonProperty("sourceInfo")
+ public SourceInfo getSourceInfo() {
+ return sourceInfo;
+ }
+
+ /**
+ * set sourceInfo
+ * @param sourceInfo the sourceInfo to set
+ */
+ public void setSourceInfo(SourceInfo sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ }
+
+ /**
+ * get sinkInfo
+ * @return the sinkInfo
+ */
+ @JsonProperty("sinkInfo")
+ public SinkInfo getSinkInfo() {
+ return sinkInfo;
+ }
+
+ /**
+ * set sinkInfo
+ * @param sinkInfo the sinkInfo to set
+ */
+ public void setSinkInfo(SinkInfo sinkInfo) {
+ this.sinkInfo = sinkInfo;
+ }
+
+ /**
+ * get transformSql
+ * @return the transformSql
+ */
+ @JsonProperty("transformSql")
+ public String getTransformSql() {
+ return transformSql;
+ }
+
+ /**
+ * set transformSql
+ * @param transformSql the transformSql to set
+ */
+ public void setTransformSql(String transformSql) {
+ this.transformSql = transformSql;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
new file mode 100644
index 0000000000..c08a6b3bc5
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.decode.SourceDecoder;
+import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.DefaultSinkData;
+import org.apache.inlong.sdk.transform.encode.KvSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkData;
+import org.apache.inlong.sdk.transform.encode.SinkEncoder;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.SinkInfo;
+import org.apache.inlong.sdk.transform.pojo.SourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import net.sf.jsqlparser.statement.select.SelectExpressionItem;
+import net.sf.jsqlparser.statement.select.SelectItem;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * TransformProcessor
+ *
+ */
+public class TransformProcessor {
+
+ private TransformConfig config;
+ private SourceDecoder decoder;
+ private SinkEncoder encoder;
+ private Charset srcCharset = Charset.defaultCharset();
+ protected Charset sinkCharset = Charset.defaultCharset();
+
+ private PlainSelect transformSelect;
+ private ExpressionOperator where;
+ private Map<String, ValueParser> selectItemMap;
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ public TransformProcessor(String configString)
+ throws JsonMappingException, JsonProcessingException,
JSQLParserException {
+ TransformConfig config = this.objectMapper.readValue(configString,
TransformConfig.class);
+ this.init(config);
+ }
+
+ public TransformProcessor(TransformConfig config) throws
JSQLParserException {
+ this.init(config);
+ }
+
+ private void init(TransformConfig config) throws JSQLParserException {
+ this.config = config;
+ if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) {
+ this.srcCharset =
Charset.forName(config.getSourceInfo().getCharset());
+ }
+ if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) {
+ this.sinkCharset =
Charset.forName(config.getSinkInfo().getCharset());
+ }
+ this.initDecoder(config);
+ this.initEncoder(config);
+ this.initTransformSql();
+ }
+
+ private void initDecoder(TransformConfig config) {
+ SourceInfo sourceInfo = config.getSourceInfo();
+ if (sourceInfo instanceof CsvSourceInfo) {
+ this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
+ } else if (sourceInfo instanceof KvSourceInfo) {
+ this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
+ }
+ }
+
+ private void initEncoder(TransformConfig config) {
+ SinkInfo sinkInfo = config.getSinkInfo();
+ if (sinkInfo instanceof CsvSinkInfo) {
+ this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo);
+ } else if (sinkInfo instanceof KvSinkInfo) {
+ this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo);
+ }
+ }
+
+ private void initTransformSql() throws JSQLParserException {
+ CCJSqlParserManager parserManager = new CCJSqlParserManager();
+ Select select = (Select) parserManager.parse(new
StringReader(config.getTransformSql()));
+ this.transformSelect = (PlainSelect) select.getSelectBody();
+ this.where =
OperatorTools.buildOperator(this.transformSelect.getWhere());
+ List<SelectItem> items = this.transformSelect.getSelectItems();
+ this.selectItemMap = new HashMap<>(items.size());
+ for (SelectItem item : items) {
+ if (item instanceof SelectExpressionItem) {
+ SelectExpressionItem exprItem = (SelectExpressionItem) item;
+ if (exprItem.getAlias() == null) {
+ this.selectItemMap.put(exprItem.toString(),
+
OperatorTools.buildParser(exprItem.getExpression()));
+ } else {
+ this.selectItemMap.put(exprItem.getAlias().getName(),
+
OperatorTools.buildParser(exprItem.getExpression()));
+ }
+ }
+ }
+ }
+
+ public List<String> transform(byte[] srcBytes, Map<String, Object>
extParams) {
+ SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+ List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+ for (int i = 1; i <= sourceData.getRowCount(); i++) {
+ if (this.where != null && !this.where.check(sourceData, i)) {
+ continue;
+ }
+ SinkData sinkData = new DefaultSinkData();
+ for (Entry<String, ValueParser> entry :
this.selectItemMap.entrySet()) {
+ String fieldName = entry.getKey();
+ Object fieldValue = entry.getValue().parse(sourceData, i);
+ sinkData.putField(fieldName, String.valueOf(fieldValue));
+ }
+ sinkDatas.add(this.encoder.encode(sinkData));
+ }
+ return sinkDatas;
+ }
+
+ public List<String> transform(String srcString, Map<String, Object>
extParams) {
+ return this.transform(srcString.getBytes(this.srcCharset), extParams);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
new file mode 100644
index 0000000000..c6464f850d
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/AndOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+
+/**
+ * AndOperator
+ *
+ */
+public class AndOperator implements ExpressionOperator {
+
+ private ExpressionOperator left;
+ private ExpressionOperator right;
+
+ public AndOperator(AndExpression expr) {
+ this.left = OperatorTools.buildOperator(expr.getLeftExpression());
+ this.right = OperatorTools.buildOperator(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return left.check(sourceData, rowIndex) && right.check(sourceData,
rowIndex);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
new file mode 100644
index 0000000000..3172626000
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * EqualsToOperator
+ *
+ */
+public class EqualsToOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public EqualsToOperator(EqualsTo expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
new file mode 100644
index 0000000000..b055e841e2
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ExpressionOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+/**
+ * ExpressionOperator
+ */
+public interface ExpressionOperator {
+
+ boolean check(SourceData sourceData, int rowIndex);
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
new file mode 100644
index 0000000000..07da9d79c2
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * GreaterThanEqualsOperator
+ *
+ */
+public class GreaterThanEqualsOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public GreaterThanEqualsOperator(GreaterThanEquals expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) >= 0;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
new file mode 100644
index 0000000000..3b2158d96b
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * GreaterThanOperator
+ *
+ */
+public class GreaterThanOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public GreaterThanOperator(GreaterThan expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) > 0;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
new file mode 100644
index 0000000000..fec4ed8019
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * MinorThanEqualsOperator
+ *
+ */
+public class MinorThanEqualsOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public MinorThanEqualsOperator(MinorThanEquals expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) <= 0;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
new file mode 100644
index 0000000000..5d9db7dd9c
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * MinorThanOperator
+ *
+ */
+public class MinorThanOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public MinorThanOperator(MinorThan expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return ObjectUtils.compare((Comparable) this.left.parse(sourceData,
rowIndex),
+ (Comparable) this.right.parse(sourceData, rowIndex)) < 0;
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
new file mode 100644
index 0000000000..9c58e70476
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import org.apache.commons.lang.ObjectUtils;
+
+/**
+ * NotEqualsToOperator
+ *
+ */
+public class NotEqualsToOperator implements ExpressionOperator {
+
+ private ValueParser left;
+ private ValueParser right;
+
+ public NotEqualsToOperator(NotEqualsTo expr) {
+ this.left = OperatorTools.buildParser(expr.getLeftExpression());
+ this.right = OperatorTools.buildParser(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex),
this.right.parse(sourceData, rowIndex));
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
new file mode 100644
index 0000000000..f648d426e7
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.NotExpression;
+
+/**
+ * NotOperator
+ *
+ */
+public class NotOperator implements ExpressionOperator {
+
+ private ExpressionOperator node;
+
+ public NotOperator(NotExpression expr) {
+ this.node = OperatorTools.buildOperator(expr.getExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return !this.node.check(sourceData, rowIndex);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
new file mode 100644
index 0000000000..c0e059f266
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
+import org.apache.inlong.sdk.transform.process.parser.LongParser;
+import org.apache.inlong.sdk.transform.process.parser.StringParser;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NotExpression;
+import net.sf.jsqlparser.expression.Parenthesis;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import net.sf.jsqlparser.schema.Column;
+
+/**
+ * OperatorTools
+ *
+ */
+public class OperatorTools {
+
+ public static ExpressionOperator buildOperator(Expression expr) {
+ if (expr instanceof AndExpression) {
+ return new AndOperator((AndExpression) expr);
+ } else if (expr instanceof OrExpression) {
+ return new OrOperator((OrExpression) expr);
+ } else if (expr instanceof Parenthesis) {
+ return new ParenthesisOperator((Parenthesis) expr);
+ } else if (expr instanceof NotExpression) {
+ return new NotOperator((NotExpression) expr);
+ } else if (expr instanceof EqualsTo) {
+ return new EqualsToOperator((EqualsTo) expr);
+ } else if (expr instanceof NotEqualsTo) {
+ return new NotEqualsToOperator((NotEqualsTo) expr);
+ } else if (expr instanceof GreaterThan) {
+ return new GreaterThanOperator((GreaterThan) expr);
+ } else if (expr instanceof GreaterThanEquals) {
+ return new GreaterThanEqualsOperator((GreaterThanEquals) expr);
+ } else if (expr instanceof MinorThan) {
+ return new MinorThanOperator((MinorThan) expr);
+ } else if (expr instanceof MinorThanEquals) {
+ return new MinorThanEqualsOperator((MinorThanEquals) expr);
+ }
+ return null;
+ }
+
+ public static ValueParser buildParser(Expression expr) {
+ if (expr instanceof Column) {
+ return new ColumnParser((Column) expr);
+ } else if (expr instanceof StringValue) {
+ return new StringParser((StringValue) expr);
+ } else if (expr instanceof LongValue) {
+ return new LongParser((LongValue) expr);
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
new file mode 100644
index 0000000000..33b9f82bdc
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OrOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+
+/**
+ * OrOperator
+ *
+ */
+public class OrOperator implements ExpressionOperator {
+
+ private ExpressionOperator left;
+ private ExpressionOperator right;
+
+ public OrOperator(OrExpression expr) {
+ this.left = OperatorTools.buildOperator(expr.getLeftExpression());
+ this.right = OperatorTools.buildOperator(expr.getRightExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return left.check(sourceData, rowIndex) || right.check(sourceData,
rowIndex);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
new file mode 100644
index 0000000000..111f6bbb21
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/ParenthesisOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.operator;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.Parenthesis;
+
+/**
+ * ParenthesisOperator
+ *
+ */
+public class ParenthesisOperator implements ExpressionOperator {
+
+ private ExpressionOperator node;
+
+ public ParenthesisOperator(Parenthesis expr) {
+ this.node = OperatorTools.buildOperator(expr.getExpression());
+ }
+
+ /**
+ * check
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public boolean check(SourceData sourceData, int rowIndex) {
+ return this.node.check(sourceData, rowIndex);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
new file mode 100644
index 0000000000..b6dd579d2a
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.schema.Column;
+
+/**
+ * ColumnParser
+ *
+ */
+public class ColumnParser implements ValueParser {
+
+ private String fieldName;
+
+ public ColumnParser(Column expr) {
+ this.fieldName = expr.getColumnName();
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex) {
+ return sourceData.getField(rowIndex, fieldName);
+ }
+
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
new file mode 100644
index 0000000000..efd61cc2cb
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/LongParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.LongValue;
+
+/**
+ * LongParser
+ *
+ */
+public class LongParser implements ValueParser {
+
+ private Long value;
+
+ public LongParser(LongValue expr) {
+ this.value = expr.getValue();
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex) {
+ return value;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
new file mode 100644
index 0000000000..9cb431c1fa
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/StringParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+import net.sf.jsqlparser.expression.StringValue;
+
+/**
+ * StringParser
+ *
+ */
+public class StringParser implements ValueParser {
+
+ private String stringValue;
+
+ public StringParser(StringValue expr) {
+ this.stringValue = expr.getValue();
+ }
+
+ /**
+ * parse
+ * @param sourceData
+ * @param rowIndex
+ * @return
+ */
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex) {
+ return stringValue;
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
new file mode 100644
index 0000000000..bafafe276c
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ValueParser.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.parser;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+
+/**
+ * ValueParser
+ *
+ */
+public interface ValueParser {
+
+ Object parse(SourceData sourceData, int rowIndex);
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
new file mode 100644
index 0000000000..3e36bfda26
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TestTransformConfig
+ *
+ */
+public class TestTransformConfig {
+
+ @Test
+ public void testCsv() {
+ try {
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ List<FieldInfo> fields = new ArrayList<>();
+ fields.add(ftime);
+ SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\",
fields);
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select ftime from source";
+ TransformConfig config = new TransformConfig(csvSource, csvSink,
transformSql);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String configString = objectMapper.writeValueAsString(config);
+ System.out.println(configString);
+ Assert.assertEquals(configString,
"{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
+ +
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
+ + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
+ +
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
+ + "\"transformSql\":\"select ftime from source\"}");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testKv() {
+ try {
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ List<FieldInfo> fields = new ArrayList<>();
+ fields.add(ftime);
+ SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+ SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+ String transformSql = "select ftime from source";
+ TransformConfig config = new TransformConfig(kvSource, kvSink,
transformSql);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String configString = objectMapper.writeValueAsString(config);
+ System.out.println(configString);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testPb() {
+ try {
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ List<FieldInfo> fields = new ArrayList<>();
+ fields.add(ftime);
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", "syntax =
\"proto3\";", "root");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select ftime from source";
+ TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String configString = objectMapper.writeValueAsString(config);
+ System.out.println(configString);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testJson() {
+ try {
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ List<FieldInfo> fields = new ArrayList<>();
+ fields.add(ftime);
+ SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select ftime from source";
+ TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String configString = objectMapper.writeValueAsString(config);
+ System.out.println(configString);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
new file mode 100644
index 0000000000..9c75456df6
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.SinkInfo;
+import org.apache.inlong.sdk.transform.pojo.SourceInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * TestTransformProcessor
+ *
+ */
+public class TestTransformProcessor {
+
+ @Test
+ public void testCsv2Kv() {
+ try {
+ List<FieldInfo> fields = new ArrayList<>();
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ fields.add(ftime);
+ FieldInfo extinfo = new FieldInfo();
+ extinfo.setName("extinfo");
+ fields.add(extinfo);
+ SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\",
fields);
+ SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+ String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
+ TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ // case1
+ TransformProcessor processor1 = new TransformProcessor(config);
+ List<String> output1 = processor1.transform("2024-04-28
00:00:00|ok", new HashMap<>());
+ Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok");
+ // case2
+ config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
+ TransformProcessor processor2 = new TransformProcessor(config);
+ List<String> output2 = processor2.transform("2024-04-28
00:00:00|ok", new HashMap<>());
+ Assert.assertTrue(output2.size() == 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testKvCsv() {
+ try {
+ List<FieldInfo> fields = new ArrayList<>();
+ FieldInfo ftime = new FieldInfo();
+ ftime.setName("ftime");
+ fields.add(ftime);
+ FieldInfo extinfo = new FieldInfo();
+ extinfo.setName("extinfo");
+ fields.add(extinfo);
+ SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
+ TransformConfig config = new TransformConfig(kvSource, csvSink,
transformSql);
+ // case1
+ TransformProcessor processor1 = new TransformProcessor(config);
+ List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+ // case2
+ config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
+ TransformProcessor processor2 = new TransformProcessor(config);
+ List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertTrue(output2.size() == 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testKvCsvByJsonConfig() {
+ try {
+ String configString1 =
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+ +
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+ +
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+ + "\"escapeChar\":\"\\\\\","
+ +
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+ + "\"transformSql\":\"select ftime,extinfo from source
where extinfo='ok'\"}";
+ // case1
+ TransformProcessor processor1 = new
TransformProcessor(configString1);
+ List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+ // case2
+ String configString2 =
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
+ +
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+ +
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
+ + "\"escapeChar\":\"\\\\\","
+ +
"\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
+ + "\"transformSql\":\"select ftime,extinfo from source
where extinfo!='ok'\"}";
+ TransformProcessor processor2 = new
TransformProcessor(configString2);
+ List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Assert.assertTrue(output2.size() == 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}