This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4517d378ec [INLONG-11873][SDK] Support parsing field values from
extended parameters (#11874)
4517d378ec is described below
commit 4517d378ece1f962f4b03d6b546c5dc9da6a2e80
Author: ChunLiang Lu <[email protected]>
AuthorDate: Tue Jun 3 12:54:22 2025 +0800
[INLONG-11873][SDK] Support parsing field values from extended parameters
(#11874)
---
...BsonSourceData.java => AbstractSourceData.java} | 28 +++++++++++++++++-----
.../sdk/transform/decode/AvroSourceData.java | 10 ++++++--
.../sdk/transform/decode/AvroSourceDecoder.java | 12 +++++-----
.../sdk/transform/decode/BsonSourceData.java | 6 +++--
.../inlong/sdk/transform/decode/CsvSourceData.java | 10 ++++++--
.../sdk/transform/decode/CsvSourceDecoder.java | 2 +-
.../sdk/transform/decode/JsonSourceData.java | 12 ++++++++--
.../sdk/transform/decode/JsonSourceDecoder.java | 12 +++++-----
.../inlong/sdk/transform/decode/KvSourceData.java | 10 ++++++--
.../sdk/transform/decode/KvSourceDecoder.java | 2 +-
.../sdk/transform/decode/ParquetSourceData.java | 10 ++++++--
.../sdk/transform/decode/ParquetSourceDecoder.java | 2 +-
.../inlong/sdk/transform/decode/PbSourceData.java | 10 ++++++--
.../sdk/transform/decode/PbSourceDecoder.java | 2 +-
.../sdk/transform/decode/RowDataSourceData.java | 9 +++++--
.../sdk/transform/decode/RowDataSourceDecoder.java | 2 +-
.../inlong/sdk/transform/decode/XmlSourceData.java | 10 ++++++--
.../sdk/transform/decode/XmlSourceDecoder.java | 2 +-
.../sdk/transform/decode/YamlSourceData.java | 2 +-
.../TestCsv2CsvForErrorOrderProcessor.java | 10 +++++---
.../process/processor/TestCsv2KvProcessor.java | 11 +++++----
.../process/processor/TestJson2CsvProcessor.java | 13 ++++++----
.../process/processor/TestKv2CsvProcessor.java | 11 +++++----
23 files changed, 139 insertions(+), 59 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
similarity index 54%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
index 97100b0240..c7e40282ae 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java
@@ -17,15 +17,31 @@
package org.apache.inlong.sdk.transform.decode;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import org.apache.inlong.sdk.transform.process.Context;
/**
- * BsonSourceData
+ * AbstractSourceData
+ *
*/
-public class BsonSourceData extends JsonSourceData {
+public abstract class AbstractSourceData implements SourceData {
- public BsonSourceData(JsonObject root, JsonArray childRoot) {
- super(root, childRoot);
+ public static final String CTX_KEY = "$ctx.";
+
+ protected Context context;
+
+ protected boolean isContextField(String fieldName) {
+ return fieldName.startsWith(CTX_KEY);
+ }
+
+ protected String getContextField(String fieldName) {
+ if (context == null) {
+ return "";
+ }
+ if (!isContextField(fieldName)) {
+ return null;
+ }
+ String realFieldName = fieldName.substring(CTX_KEY.length());
+ String fieldValue = this.context.getStringOrDefault(realFieldName, "");
+ return fieldValue;
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
index 42705433f4..3b603c1c5c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
@@ -29,7 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class AvroSourceData implements SourceData {
+public class AvroSourceData extends AbstractSourceData {
public static final String ROOT_KEY = "$root";
@@ -41,10 +43,11 @@ public class AvroSourceData implements SourceData {
private Charset srcCharset;
- public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot,
Charset srcCharset) {
+ public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot,
Charset srcCharset, Context context) {
this.root = root;
this.childRoot = childRoot;
this.srcCharset = srcCharset;
+ this.context = context;
}
@Override
@@ -59,6 +62,9 @@ public class AvroSourceData implements SourceData {
@Override
public String getField(int rowNum, String fieldName) {
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
List<AvroNode> childNodes = new ArrayList<>();
String[] nodeStrings = fieldName.split("\\.");
for (String nodeString : nodeStrings) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
index 992dea88c6..360a080f76 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
@@ -74,7 +74,7 @@ public class AvroSourceDecoder extends SourceDecoder<byte[]> {
GenericRecord root = dataFileStream.next();
List<GenericRecord> childRoot = null;
if (CollectionUtils.isEmpty(childNodes)) {
- return new AvroSourceData(root, null, srcCharset);
+ return new AvroSourceData(root, null, srcCharset, context);
}
Object current = root;
@@ -83,12 +83,12 @@ public class AvroSourceDecoder extends
SourceDecoder<byte[]> {
for (AvroNode node : childNodes) {
if (curSchema.getType() != Type.RECORD) {
// error data
- return new AvroSourceData(root, null, srcCharset);
+ return new AvroSourceData(root, null, srcCharset, context);
}
Object newElement = ((GenericRecord)
current).get(node.getName());
if (newElement == null) {
// error data
- return new AvroSourceData(root, null, srcCharset);
+ return new AvroSourceData(root, null, srcCharset, context);
}
// node is not array
if (!node.isArray()) {
@@ -100,15 +100,15 @@ public class AvroSourceDecoder extends
SourceDecoder<byte[]> {
current = getElementFromArray(node, newElement, curSchema);
if (current == null) {
// error data
- return new AvroSourceData(root, null, srcCharset);
+ return new AvroSourceData(root, null, srcCharset, context);
}
}
if (curSchema.getType() != Type.ARRAY) {
// error data
- return new AvroSourceData(root, null, srcCharset);
+ return new AvroSourceData(root, null, srcCharset, context);
}
childRoot = (List<GenericRecord>) current;
- return new AvroSourceData(root, childRoot, srcCharset);
+ return new AvroSourceData(root, childRoot, srcCharset, context);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
index 97100b0240..7b6d31f066 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
@@ -25,7 +27,7 @@ import com.google.gson.JsonObject;
*/
public class BsonSourceData extends JsonSourceData {
- public BsonSourceData(JsonObject root, JsonArray childRoot) {
- super(root, childRoot);
+ public BsonSourceData(JsonObject root, JsonArray childRoot, Context
context) {
+ super(root, childRoot, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index e0bd9f794c..76442c7bc2 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -26,13 +28,14 @@ import java.util.Map;
* CsvSourceData
*
*/
-public class CsvSourceData implements SourceData {
+public class CsvSourceData extends AbstractSourceData {
private List<Map<String, Object>> rows = new ArrayList<>();
private Map<String, Object> currentRow;
- public CsvSourceData() {
+ public CsvSourceData(Context context) {
+ this.context = context;
}
public void putField(String fieldName, Object fieldValue) {
@@ -54,6 +57,9 @@ public class CsvSourceData implements SourceData {
if (rowNum >= this.rows.size()) {
return null;
}
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
Map<String, Object> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 830d5c2c01..4eb13de398 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -60,7 +60,7 @@ public class CsvSourceDecoder extends SourceDecoder<String> {
@Override
public SourceData decode(String srcString, Context context) {
String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter,
escapeChar, '\"', '\n', true);
- CsvSourceData sourceData = new CsvSourceData();
+ CsvSourceData sourceData = new CsvSourceData(context);
for (int i = 0; i < rowValues.length; i++) {
String[] fieldValues = rowValues[i];
sourceData.addRow();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
index ea9d296a97..dba0608e4b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -29,7 +31,7 @@ import java.util.List;
* JsonSourceData
*
*/
-public class JsonSourceData implements SourceData {
+public class JsonSourceData extends AbstractSourceData {
public static final String ROOT_KEY = "$root";
@@ -43,10 +45,12 @@ public class JsonSourceData implements SourceData {
* Constructor
* @param root
* @param childRoot
+ * @param context
*/
- public JsonSourceData(JsonObject root, JsonArray childRoot) {
+ public JsonSourceData(JsonObject root, JsonArray childRoot, Context
context) {
this.root = root;
this.childRoot = childRoot;
+ this.context = context;
}
/**
@@ -71,6 +75,10 @@ public class JsonSourceData implements SourceData {
@Override
public String getField(int rowNum, String fieldName) {
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
+ // split field name
List<JsonNode> childNodes = new ArrayList<>();
String[] nodeStrings = fieldName.split("\\.");
for (String nodeString : nodeStrings) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 0332094090..38db4ad03d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -86,18 +86,18 @@ public class JsonSourceDecoder extends
SourceDecoder<String> {
JsonObject root = gson.fromJson(srcString, JsonObject.class);
JsonArray childRoot = null;
if (CollectionUtils.isEmpty(childNodes)) {
- return new JsonSourceData(root, null);
+ return new JsonSourceData(root, null, context);
}
JsonElement current = root;
for (JsonNode node : childNodes) {
if (!current.isJsonObject()) {
// error data
- return new JsonSourceData(root, null);
+ return new JsonSourceData(root, null, context);
}
JsonElement newElement =
current.getAsJsonObject().get(node.getName());
if (newElement == null) {
// error data
- return new JsonSourceData(root, null);
+ return new JsonSourceData(root, null, context);
}
// node is not array
if (!node.isArray()) {
@@ -108,15 +108,15 @@ public class JsonSourceDecoder extends
SourceDecoder<String> {
current = getElementFromArray(node, newElement);
if (current == null) {
// error data
- return new JsonSourceData(root, null);
+ return new JsonSourceData(root, null, context);
}
}
if (!current.isJsonArray()) {
// error data
- return new JsonSourceData(root, null);
+ return new JsonSourceData(root, null, context);
}
childRoot = current.getAsJsonArray();
- return new JsonSourceData(root, childRoot);
+ return new JsonSourceData(root, childRoot, context);
}
private JsonElement getElementFromArray(JsonNode node, JsonElement
curElement) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
index e260ae15f4..2349b94e8a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,13 +27,14 @@ import java.util.Map;
/**
* KvSourceData
*/
-public class KvSourceData implements SourceData {
+public class KvSourceData extends AbstractSourceData {
private List<Map<String, String>> rows = new ArrayList<>();
private Map<String, String> currentRow;
- public KvSourceData() {
+ public KvSourceData(Context context) {
+ this.context = context;
}
public void putField(String fieldName, String fieldValue) {
@@ -53,6 +56,9 @@ public class KvSourceData implements SourceData {
if (rowNum >= this.rows.size()) {
return null;
}
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
Map<String, String> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index e62fcccc37..049d7fb610 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -75,7 +75,7 @@ public class KvSourceDecoder extends SourceDecoder<String> {
public SourceData decode(String srcString, Context context) {
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString,
entryDelimiter, kvDelimiter,
escapeChar, quoteChar, lineDelimiter);
- KvSourceData sourceData = new KvSourceData();
+ KvSourceData sourceData = new KvSourceData(context);
if (CollectionUtils.isEmpty(fields)) {
for (Map<String, String> row : rowValues) {
sourceData.addRow();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
index ab294d867b..7ff6718bae 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.schema.GroupType;
@@ -29,7 +31,7 @@ import java.nio.charset.Charset;
/**
* ParquetSourceData
*/
-public class ParquetSourceData implements SourceData {
+public class ParquetSourceData extends AbstractSourceData {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetSourceData.class);
@@ -44,7 +46,7 @@ public class ParquetSourceData implements SourceData {
private Type childType;
private int rowCount = -1;
- public ParquetSourceData(Group root, String childPath, Charset srcCharset)
{
+ public ParquetSourceData(Group root, String childPath, Charset srcCharset,
Context context) {
this.rootGroup = root;
String pathStr = "";
if (!StringUtils.isEmpty(childPath)) {
@@ -60,6 +62,7 @@ public class ParquetSourceData implements SourceData {
}
}
this.srcCharset = srcCharset;
+ this.context = context;
}
@Override
public int getRowCount() {
@@ -74,6 +77,9 @@ public class ParquetSourceData implements SourceData {
public String getField(int rowNum, String fieldName) {
String fieldValue = "";
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
// Dealing with multi-level paths
fieldName = fieldName.substring(ROOT_KEY.length());
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
index 85e3ba319d..6f58ee3070 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -92,7 +92,7 @@ public class ParquetSourceDecoder extends
SourceDecoder<byte[]> {
for (int i = 0; i < rows; i++) {
Group group = recordReader.read();
if (group != null) {
- return new ParquetSourceData(group,
this.childMessagePath, this.srcCharset);
+ return new ParquetSourceData(group,
this.childMessagePath, this.srcCharset, context);
}
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
index 147eaa67d1..d4bea9906e 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
@@ -34,7 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
* JsonSourceData
*
*/
-public class PbSourceData implements SourceData {
+public class PbSourceData extends AbstractSourceData {
private static final Logger LOG =
LoggerFactory.getLogger(PbSourceData.class);
@@ -60,13 +62,14 @@ public class PbSourceData implements SourceData {
public PbSourceData(DynamicMessage root, List<DynamicMessage> childRoot,
Descriptors.Descriptor rootDesc, Descriptors.Descriptor childDesc,
Map<String, List<PbNode>> columnNodeMap,
- Charset srcCharset) {
+ Charset srcCharset, Context context) {
this.root = root;
this.childRoot = childRoot;
this.rootDesc = rootDesc;
this.childDesc = childDesc;
this.columnNodeMap = columnNodeMap;
this.srcCharset = srcCharset;
+ this.context = context;
}
/**
@@ -105,6 +108,9 @@ public class PbSourceData implements SourceData {
public String getField(int rowNum, String fieldName) {
String fieldValue = "";
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
fieldValue = this.getRootField(fieldName);
} else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 4ee704bd0e..021c625fd4 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -145,7 +145,7 @@ public class PbSourceDecoder extends SourceDecoder<byte[]> {
}
}
}
- return new PbSourceData(root, childRoot, rootDesc, childDesc,
columnNodeMap, srcCharset);
+ return new PbSourceData(root, childRoot, rootDesc, childDesc,
columnNodeMap, srcCharset, context);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
index fbfd423497..8d7d017e3b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceData.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.utils.RowToFieldDataUtils;
import lombok.extern.slf4j.Slf4j;
@@ -25,7 +26,7 @@ import org.apache.flink.table.data.RowData;
import java.util.Map;
@Slf4j
-public class RowDataSourceData implements SourceData {
+public class RowDataSourceData extends AbstractSourceData {
private final RowData rowData;
private final Map<String, Integer> fieldPositionMap;
@@ -34,10 +35,11 @@ public class RowDataSourceData implements SourceData {
public RowDataSourceData(
RowData rowData,
Map<String, Integer> fieldPositionMap,
- RowToFieldDataUtils.RowFieldConverter[] converters) {
+ RowToFieldDataUtils.RowFieldConverter[] converters, Context
context) {
this.rowData = rowData;
this.fieldPositionMap = fieldPositionMap;
this.converters = converters;
+ this.context = context;
}
@Override
@@ -51,6 +53,9 @@ public class RowDataSourceData implements SourceData {
return null;
}
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
int fieldPosition = fieldPositionMap.get(fieldName);
return converters[fieldPosition].convert(rowData, fieldPosition);
} catch (Throwable e) {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
index a82fc2ac6b..3565b292a8 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/RowDataSourceDecoder.java
@@ -61,7 +61,7 @@ public class RowDataSourceDecoder extends
SourceDecoder<RowData> {
@Override
public SourceData decode(RowData rowData, Context context) {
- return new RowDataSourceData(rowData, fieldPositionMap,
rowFieldConverters);
+ return new RowDataSourceData(rowData, fieldPositionMap,
rowFieldConverters, context);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
index 1e23c2b5d7..7e7fad3b84 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceData.java
@@ -17,13 +17,15 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.process.Context;
+
import java.util.List;
import java.util.Map;
/**
* XmlSourceData
*/
-public class XmlSourceData implements SourceData {
+public class XmlSourceData extends AbstractSourceData {
public static final String ROOT_KEY = "$root";
@@ -33,7 +35,7 @@ public class XmlSourceData implements SourceData {
private XmlNode childRoot;
- public XmlSourceData(XmlNode root, XmlNode childRoot) {
+ public XmlSourceData(XmlNode root, XmlNode childRoot, Context context) {
this.root = root;
this.childRoot = new XmlNode();
if (childRoot != null) {
@@ -42,6 +44,7 @@ public class XmlSourceData implements SourceData {
this.childRoot = (XmlNode) value;
}
}
+ this.context = context;
}
@Override
@@ -61,6 +64,9 @@ public class XmlSourceData implements SourceData {
@Override
public String getField(int rowNum, String fieldName) {
try {
+ if (isContextField(fieldName)) {
+ return getContextField(fieldName);
+ }
String[] nodeString = fieldName.split("\\.");
Object cur = null, last = null;
int start = -1;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
index a9a6bb9a66..625f31aa93 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
@@ -83,7 +83,7 @@ public class XmlSourceDecoder extends SourceDecoder<String> {
cur = child.getValue();
}
}
- return new XmlSourceData(rootObj, child);
+ return new XmlSourceData(rootObj, child, context);
} catch (Exception e) {
log.error("Data parsing failed", e);
return null;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
index a9d3162fd7..fb54d63efa 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceData.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sdk.transform.decode;
import java.util.List;
import java.util.Map;
-public class YamlSourceData implements SourceData {
+public class YamlSourceData extends AbstractSourceData {
public static final String ROOT_KEY = "$root";
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
index d03d112776..300b5e3c9c 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2CsvForErrorOrderProcessor.java
@@ -33,6 +33,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TestCsv2CsvForErrorOrderProcessor extends
AbstractProcessorTestBase {
@@ -42,16 +43,19 @@ public class TestCsv2CsvForErrorOrderProcessor extends
AbstractProcessorTestBase
CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\',
sourceFields);
List<FieldInfo> sinkFields = this.getTestFieldList("field1", "field2",
"field3");
CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', sinkFields);
- String transformSql = "select ftime as field2,data as field3,extinfo
as field4 from source where extinfo='ok'";
+ String transformSql =
+ "select ftime as field2,data as field3,extinfo as
field4,$ctx.partition as field1 from source where extinfo='ok'";
TransformConfig config = new TransformConfig(transformSql, false);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
- List<String> output1 = processor1.transform("2024-04-28
00:00:00|ok|data1", new HashMap<>());
+ Map<String, Object> extParams = new HashMap<>();
+ extParams.put("partition", "2024042801");
+ List<String> output1 = processor1.transform("2024-04-28
00:00:00|ok|data1", extParams);
Assert.assertEquals(1, output1.size());
- Assert.assertEquals("|2024-04-28 00:00:00|data1", output1.get(0));
+ Assert.assertEquals("2024042801|2024-04-28 00:00:00|data1",
output1.get(0));
}
@Test
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
index 6b19acb11c..2b57409714 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2KvProcessor.java
@@ -30,24 +30,27 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TestCsv2KvProcessor extends AbstractProcessorTestBase {
@Test
public void testCsv2Kv() throws Exception {
- List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+ List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo",
"ds");
CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", '|', '\\',
fields);
KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
- String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
+ String transformSql = "select ftime,extinfo,$ctx.partition from source
where extinfo='ok'";
TransformConfig config = new TransformConfig(transformSql);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
- List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
+ Map<String, Object> extParams = new HashMap<>();
+ extParams.put("partition", "2024042801");
+ List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok",
extParams);
Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok");
+ Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok&ds=2024042801");
// case2
config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
TransformProcessor<String, String> processor2 = TransformProcessor
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
index f649c5a8f7..0d45398c2c 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2CsvProcessor.java
@@ -30,15 +30,16 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TestJson2CsvProcessor extends AbstractProcessorTestBase {
@Test
public void testJson2Csv() throws Exception {
- List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID",
"msgTime", "msg");
+ List<FieldInfo> fields1 = this.getTestFieldList("sid", "packageID",
"msgTime", "msg", "ds");
JsonSourceInfo jsonSource1 = new JsonSourceInfo("UTF-8", "msgs");
CsvSinkInfo csvSink1 = new CsvSinkInfo("UTF-8", '|', '\\', fields1);
- String transformSql1 = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ String transformSql1 = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg,$ctx.partition from source";
TransformConfig config1 = new TransformConfig(transformSql1);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
@@ -52,10 +53,12 @@ public class TestJson2CsvProcessor extends
AbstractProcessorTestBase {
+ " {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+ " ]\n"
+ "}";
- List<String> output1 = processor1.transform(srcString1, new
HashMap<>());
+ Map<String, Object> extParams = new HashMap<>();
+ extParams.put("partition", "2024042801");
+ List<String> output1 = processor1.transform(srcString1, extParams);
Assert.assertEquals(2, output1.size());
- Assert.assertEquals(output1.get(0),
"value1|value2|1713243918000|value4");
- Assert.assertEquals(output1.get(1), "value1|value2|1713243918000|v4");
+ Assert.assertEquals(output1.get(0),
"value1|value2|1713243918000|value4|2024042801");
+ Assert.assertEquals(output1.get(1),
"value1|value2|1713243918000|v4|2024042801");
// case2
List<FieldInfo> fields2 = this.getTestFieldList("id", "itemId",
"subItemId", "msg");
JsonSourceInfo jsonSource2 = new JsonSourceInfo("UTF-8", "items");
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
index 82a4762f93..d8766f3e52 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestKv2CsvProcessor.java
@@ -30,23 +30,26 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TestKv2CsvProcessor extends AbstractProcessorTestBase {
@Test
public void testKv2Csv() throws Exception {
- List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo");
+ List<FieldInfo> fields = this.getTestFieldList("ftime", "extinfo",
"ds");
KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
- String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
+ String transformSql = "select ftime,extinfo,$ctx.partition from source
where extinfo='ok'";
TransformConfig config = new TransformConfig(transformSql);
// case1
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config, SourceDecoderFactory.createKvDecoder(kvSource),
SinkEncoderFactory.createCsvEncoder(csvSink));
- List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
+ Map<String, Object> extParams = new HashMap<>();
+ extParams.put("partition", "2024042801");
+ List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", extParams);
Assert.assertEquals(1, output1.size());
- Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
+ Assert.assertEquals(output1.get(0), "2024-04-28
00:00:00|ok|2024042801");
// case2
config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
TransformProcessor<String, String> processor2 = TransformProcessor