This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 13481e140d [INLONG-10652][SDK] Inlong Transform support for generics
(#10672)
13481e140d is described below
commit 13481e140d2114b375e4b43d6af3f27c58c76e70
Author: vernedeng <[email protected]>
AuthorDate: Fri Jul 19 09:45:37 2024 +0800
[INLONG-10652][SDK] Inlong Transform support for generics (#10672)
* [INLONG-10652][SDK] Inlong Transform support for generics
* fix UT
---
.../sdk/transform/decode/CsvSourceDecoder.java | 2 +-
.../sdk/transform/decode/JsonSourceDecoder.java | 2 +-
.../sdk/transform/decode/KvSourceDecoder.java | 2 +-
.../sdk/transform/decode/PbSourceDecoder.java | 14 +-
.../inlong/sdk/transform/decode/SourceDecoder.java | 5 +-
...ourceDecoder.java => SourceDecoderFactory.java} | 26 ++-
.../sdk/transform/encode/CsvSinkEncoder.java | 2 +-
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 2 +-
.../inlong/sdk/transform/encode/SinkEncoder.java | 4 +-
.../{SinkEncoder.java => SinkEncoderFactory.java} | 18 +-
.../inlong/sdk/transform/pojo/TransformConfig.java | 45 +----
.../sdk/transform/process/TransformProcessor.java | 98 +++--------
.../sdk/transform/pojo/TestTransformConfig.java | 118 -------------
.../TestTransformArithmeticFunctionsProcessor.java | 66 +++++---
.../transform/process/TestTransformProcessor.java | 187 +++++++++++----------
15 files changed, 208 insertions(+), 383 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 14a505d422..daddfd36d7 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -30,7 +30,7 @@ import java.util.Map;
* CsvSourceDecoder
*
*/
-public class CsvSourceDecoder implements SourceDecoder {
+public class CsvSourceDecoder implements SourceDecoder<String> {
protected CsvSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index 57bf6a9982..13c363912a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -34,7 +34,7 @@ import java.util.Map;
* JsonSourceDecoder
*
*/
-public class JsonSourceDecoder implements SourceDecoder {
+public class JsonSourceDecoder implements SourceDecoder<String> {
protected JsonSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 03b40c9f1c..77a4fef8b4 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -30,7 +30,7 @@ import java.util.Map;
* KvSourceDecoder
*
*/
-public class KvSourceDecoder implements SourceDecoder {
+public class KvSourceDecoder implements SourceDecoder<String> {
protected KvSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 5ac13cf28f..6c8a919e24 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -38,7 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
* PbSourceDecoder
*
*/
-public class PbSourceDecoder implements SourceDecoder {
+public class PbSourceDecoder implements SourceDecoder<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(PbSourceDecoder.class);
@@ -150,16 +150,4 @@ public class PbSourceDecoder implements SourceDecoder {
return null;
}
}
-
- /**
- * decode
- * @param srcString
- * @param extParams
- * @return
- */
- @Override
- public SourceData decode(String srcString, Map<String, Object> extParams) {
- byte[] srcBytes = Base64.getDecoder().decode(srcString);
- return this.decode(srcBytes, extParams);
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index a11cd89351..7bbb4dda2d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -22,9 +22,10 @@ import java.util.Map;
/**
* SourceDecoder
*/
-public interface SourceDecoder {
+public interface SourceDecoder<Input> {
SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
- SourceData decode(String srcString, Map<String, Object> extParams);
+ SourceData decode(Input input, Map<String, Object> extParams);
+
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
similarity index 51%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
index a11cd89351..b29f6f093c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoderFactory.java
@@ -17,14 +17,26 @@
package org.apache.inlong.sdk.transform.decode;
-import java.util.Map;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
-/**
- * SourceDecoder
- */
-public interface SourceDecoder {
+public class SourceDecoderFactory {
+
+ public static CsvSourceDecoder createCsvDecoder(CsvSourceInfo sourceInfo) {
+ return new CsvSourceDecoder(sourceInfo);
+ }
+
+ public static KvSourceDecoder createKvDecoder(KvSourceInfo sourceInfo) {
+ return new KvSourceDecoder(sourceInfo);
+ }
- SourceData decode(byte[] srcBytes, Map<String, Object> extParams);
+ public static JsonSourceDecoder createJsonDecoder(JsonSourceInfo
sourceInfo) {
+ return new JsonSourceDecoder(sourceInfo);
+ }
- SourceData decode(String srcString, Map<String, Object> extParams);
+ public static PbSourceDecoder createPbDecoder(PbSourceInfo sourceInfo) {
+ return new PbSourceDecoder(sourceInfo);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 4b963c10fb..09cae6ea1b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -28,7 +28,7 @@ import java.util.List;
/**
* CsvSinkEncoder
*/
-public class CsvSinkEncoder implements SinkEncoder {
+public class CsvSinkEncoder implements SinkEncoder<String> {
protected CsvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 503914e80a..be0a7ba980 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -28,7 +28,7 @@ import java.util.List;
/**
* KvSinkEncoder
*/
-public class KvSinkEncoder implements SinkEncoder {
+public class KvSinkEncoder implements SinkEncoder<String> {
protected KvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index 3839da0160..150f1811f1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -24,9 +24,9 @@ import java.util.List;
/**
* SinkEncoder
*/
-public interface SinkEncoder {
+public interface SinkEncoder<Output> {
- String encode(SinkData sinkData);
+ Output encode(SinkData sinkData);
List<FieldInfo> getFields();
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
similarity index 67%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 3839da0160..f95d19bfca 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -17,16 +17,16 @@
package org.apache.inlong.sdk.transform.encode;
-import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import java.util.List;
+public class SinkEncoderFactory {
-/**
- * SinkEncoder
- */
-public interface SinkEncoder {
-
- String encode(SinkData sinkData);
+ public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
+ return new CsvSinkEncoder(csvSinkInfo);
+ }
- List<FieldInfo> getFields();
+ public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
+ return new KvSinkEncoder(kvSinkInfo);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
index ff1ac958fc..71dd71be3b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/TransformConfig.java
@@ -25,57 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*/
public class TransformConfig {
- @JsonProperty("sourceInfo")
- private SourceInfo sourceInfo;
- @JsonProperty("sinkInfo")
- private SinkInfo sinkInfo;
@JsonProperty("transformSql")
private String transformSql;
@JsonCreator
- public TransformConfig(
- @JsonProperty("sourceInfo") SourceInfo sourceInfo,
- @JsonProperty("sinkInfo") SinkInfo sinkInfo,
- @JsonProperty("transformSql") String transformSql) {
- this.sourceInfo = sourceInfo;
- this.sinkInfo = sinkInfo;
+ public TransformConfig(@JsonProperty("transformSql") String transformSql) {
this.transformSql = transformSql;
}
- /**
- * get sourceInfo
- * @return the sourceInfo
- */
- @JsonProperty("sourceInfo")
- public SourceInfo getSourceInfo() {
- return sourceInfo;
- }
-
- /**
- * set sourceInfo
- * @param sourceInfo the sourceInfo to set
- */
- public void setSourceInfo(SourceInfo sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- /**
- * get sinkInfo
- * @return the sinkInfo
- */
- @JsonProperty("sinkInfo")
- public SinkInfo getSinkInfo() {
- return sinkInfo;
- }
-
- /**
- * set sinkInfo
- * @param sinkInfo the sinkInfo to set
- */
- public void setSinkInfo(SinkInfo sinkInfo) {
- this.sinkInfo = sinkInfo;
- }
-
/**
* get transformSql
* @return the transformSql
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index c3ee9270cc..0e74180932 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -17,46 +17,28 @@
package org.apache.inlong.sdk.transform.process;
-import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
-import org.apache.inlong.sdk.transform.decode.PbSourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
-import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder;
import org.apache.inlong.sdk.transform.encode.DefaultSinkData;
-import org.apache.inlong.sdk.transform.encode.KvSinkEncoder;
import org.apache.inlong.sdk.transform.encode.SinkData;
import org.apache.inlong.sdk.transform.encode.SinkEncoder;
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
-import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.StringReader;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -67,65 +49,40 @@ import java.util.Map.Entry;
* TransformProcessor
*
*/
-public class TransformProcessor {
+public class TransformProcessor<I, O> {
private static final Logger LOG =
LoggerFactory.getLogger(TransformProcessor.class);
+ private static final Map<String, Object> EMPTY_EXT_PARAMS =
ImmutableMap.of();
+
private TransformConfig config;
- private SourceDecoder decoder;
- private SinkEncoder encoder;
- private Charset srcCharset = Charset.defaultCharset();
- protected Charset sinkCharset = Charset.defaultCharset();
+ private SourceDecoder<I> decoder;
+ private SinkEncoder<O> encoder;
private PlainSelect transformSelect;
private ExpressionOperator where;
private Map<String, ValueParser> selectItemMap;
- private ObjectMapper objectMapper = new ObjectMapper();
-
- public TransformProcessor(String configString)
- throws JsonMappingException, JsonProcessingException,
JSQLParserException {
- TransformConfig config = this.objectMapper.readValue(configString,
TransformConfig.class);
- this.init(config);
- }
-
- public TransformProcessor(TransformConfig config) throws
JSQLParserException {
- this.init(config);
+ public static <I, O> TransformProcessor<I, O> create(
+ TransformConfig config,
+ SourceDecoder<I> decoder,
+ SinkEncoder<O> encoder) throws JSQLParserException {
+ return new TransformProcessor<>(config, decoder, encoder);
}
- private void init(TransformConfig config) throws JSQLParserException {
+ private TransformProcessor(
+ TransformConfig config,
+ SourceDecoder<I> decoder,
+ SinkEncoder<O> encoder)
+ throws JSQLParserException {
this.config = config;
- if (!StringUtils.isBlank(config.getSourceInfo().getCharset())) {
- this.srcCharset =
Charset.forName(config.getSourceInfo().getCharset());
- }
- if (!StringUtils.isBlank(config.getSinkInfo().getCharset())) {
- this.sinkCharset =
Charset.forName(config.getSinkInfo().getCharset());
- }
- this.initDecoder(config);
- this.initEncoder(config);
- this.initTransformSql();
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.init();
}
- private void initDecoder(TransformConfig config) {
- SourceInfo sourceInfo = config.getSourceInfo();
- if (sourceInfo instanceof CsvSourceInfo) {
- this.decoder = new CsvSourceDecoder((CsvSourceInfo) sourceInfo);
- } else if (sourceInfo instanceof KvSourceInfo) {
- this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
- } else if (sourceInfo instanceof JsonSourceInfo) {
- this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo);
- } else if (sourceInfo instanceof PbSourceInfo) {
- this.decoder = new PbSourceDecoder((PbSourceInfo) sourceInfo);
- }
- }
-
- private void initEncoder(TransformConfig config) {
- SinkInfo sinkInfo = config.getSinkInfo();
- if (sinkInfo instanceof CsvSinkInfo) {
- this.encoder = new CsvSinkEncoder((CsvSinkInfo) sinkInfo);
- } else if (sinkInfo instanceof KvSinkInfo) {
- this.encoder = new KvSinkEncoder((KvSinkInfo) sinkInfo);
- }
+ private void init() throws JSQLParserException {
+ this.initTransformSql();
}
private void initTransformSql() throws JSQLParserException {
@@ -157,12 +114,16 @@ public class TransformProcessor {
}
}
- public List<String> transform(byte[] srcBytes, Map<String, Object>
extParams) {
- SourceData sourceData = this.decoder.decode(srcBytes, extParams);
+ public List<O> transform(I input) {
+ return this.transform(input, EMPTY_EXT_PARAMS);
+ }
+
+ public List<O> transform(I input, Map<String, Object> extParams) {
+ SourceData sourceData = this.decoder.decode(input, extParams);
if (sourceData == null) {
return null;
}
- List<String> sinkDatas = new ArrayList<>(sourceData.getRowCount());
+ List<O> sinkDatas = new ArrayList<>(sourceData.getRowCount());
for (int i = 0; i < sourceData.getRowCount(); i++) {
if (this.where != null && !this.where.check(sourceData, i)) {
continue;
@@ -183,7 +144,4 @@ public class TransformProcessor {
return sinkDatas;
}
- public List<String> transform(String srcString, Map<String, Object>
extParams) {
- return this.transform(srcString.getBytes(this.srcCharset), extParams);
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
deleted file mode 100644
index c26b3f9cca..0000000000
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.transform.pojo;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TestTransformConfig
- *
- */
-public class TestTransformConfig {
-
- @Test
- public void testCsv() {
- try {
- FieldInfo ftime = new FieldInfo();
- ftime.setName("ftime");
- List<FieldInfo> fields = new ArrayList<>();
- fields.add(ftime);
- SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\",
fields);
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
- String transformSql = "select ftime from source";
- TransformConfig config = new TransformConfig(csvSource, csvSink,
transformSql);
- ObjectMapper objectMapper = new ObjectMapper();
- String configString = objectMapper.writeValueAsString(config);
- System.out.println(configString);
- Assert.assertEquals(configString,
"{\"sourceInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
- +
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
- + "\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\","
- +
"\"delimiter\":\"|\",\"escapeChar\":\"\\\\\",\"fields\":[{\"name\":\"ftime\"}]},"
- + "\"transformSql\":\"select ftime from source\"}");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testKv() {
- try {
- FieldInfo ftime = new FieldInfo();
- ftime.setName("ftime");
- List<FieldInfo> fields = new ArrayList<>();
- fields.add(ftime);
- SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
- SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
- String transformSql = "select ftime from source";
- TransformConfig config = new TransformConfig(kvSource, kvSink,
transformSql);
- ObjectMapper objectMapper = new ObjectMapper();
- String configString = objectMapper.writeValueAsString(config);
- System.out.println(configString);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testPb() {
- try {
- FieldInfo ftime = new FieldInfo();
- ftime.setName("ftime");
- List<FieldInfo> fields = new ArrayList<>();
- fields.add(ftime);
- String transformBase64 =
"CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUg"
- +
"Ntc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLk"
- +
"V4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUY"
- +
"AiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMh"
- +
"AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=";
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
- String transformSql = "select ftime from source";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
- ObjectMapper objectMapper = new ObjectMapper();
- String configString = objectMapper.writeValueAsString(config);
- System.out.println(configString);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void testJson() {
- try {
- FieldInfo ftime = new FieldInfo();
- ftime.setName("ftime");
- List<FieldInfo> fields = new ArrayList<>();
- fields.add(ftime);
- SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "root");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
- String transformSql = "select ftime from source";
- TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
- ObjectMapper objectMapper = new ObjectMapper();
- String configString = objectMapper.writeValueAsString(config);
- System.out.println(configString);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
index b291d2b685..488095459c 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java
@@ -17,11 +17,11 @@
package org.apache.inlong.sdk.transform.process;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.junit.Assert;
@@ -39,8 +39,8 @@ public class TestTransformArithmeticFunctionsProcessor {
private static final List<FieldInfo> srcFields = new ArrayList<>();
private static final List<FieldInfo> dstFields = new ArrayList<>();
- private static final SourceInfo csvSource;
- private static final SinkInfo kvSink;
+ private static final CsvSourceInfo csvSource;
+ private static final KvSinkInfo kvSink;
static {
for (int i = 1; i < 5; i++) {
FieldInfo field = new FieldInfo();
@@ -57,9 +57,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testPowerFunction() throws Exception {
String transformSql = "select power(numeric1, numeric2) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: 2^4
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("2|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=16.0");
@@ -76,9 +78,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testAbsFunction() throws Exception {
String transformSql = "select abs(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: |2|
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("2|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=2");
@@ -91,9 +95,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testSqrtFunction() throws Exception {
String transformSql = "select sqrt(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: sqrt(9)
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("9|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=3.0");
@@ -106,9 +112,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testLnFunction() throws Exception {
String transformSql = "select ln(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: ln(1)
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=0.0");
@@ -121,9 +129,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testLog10Function() throws Exception {
String transformSql = "select log10(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: log10(1)
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=0.0");
@@ -136,9 +146,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testLog2Function() throws Exception {
String transformSql = "select log2(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: log2(1)
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("1|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=0.0");
@@ -151,21 +163,27 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testLogFunction() throws Exception {
String transformSql1 = "select log(numeric1) from source";
- TransformConfig config1 = new TransformConfig(csvSource, kvSink,
transformSql1);
+ TransformConfig config1 = new TransformConfig(transformSql1);
// case1: ln(1)
- TransformProcessor processor1 = new TransformProcessor(config1);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config1,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor1.transform("1|4|6|8", new
HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=0.0");
String transformSql2 = "select log(numeric1, numeric2) from source";
- TransformConfig config2 = new TransformConfig(csvSource, kvSink,
transformSql2);
+ TransformConfig config2 = new TransformConfig(transformSql2);
// case2: log2(8)
- TransformProcessor processor2 = new TransformProcessor(config2);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config2,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output2 = processor2.transform("2|8|6|8", new
HashMap<>());
Assert.assertEquals(1, output2.size());
Assert.assertEquals(output2.get(0), "result=3.0");
// case3: log10(100)
- TransformProcessor processor3 = new TransformProcessor(config2);
+ TransformProcessor<String, String> processor3 = TransformProcessor
+ .create(config2,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output3 = processor3.transform("10|100|6|8", new
HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "result=2.0");
@@ -174,9 +192,11 @@ public class TestTransformArithmeticFunctionsProcessor {
@Test
public void testExpFunction() throws Exception {
String transformSql = "select exp(numeric1) from source";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1: e^0
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor.transform("0|4|6|8", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=1.0");
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index b76dde6f7f..93c9448999 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sdk.transform.process;
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
@@ -24,8 +26,6 @@ import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
-import org.apache.inlong.sdk.transform.pojo.SinkInfo;
-import org.apache.inlong.sdk.transform.pojo.SourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.junit.Assert;
@@ -51,38 +51,48 @@ public class TestTransformProcessor {
FieldInfo extinfo = new FieldInfo();
extinfo.setName("extinfo");
fields.add(extinfo);
- SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", fields);
- SinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
+ CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\",
fields);
+ KvSinkInfo kvSink = new KvSinkInfo("UTF-8", fields);
String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor1 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+
List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
- Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok");
// case2
config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
- TransformProcessor processor2 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+
List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
- Assert.assertTrue(output2.size() == 0);
+ Assert.assertEquals(0, output2.size());
}
@Test
public void testCsv2KvNoField() throws Exception {
- SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
- SinkInfo kvSink = new KvSinkInfo("UTF-8", null);
+ CsvSourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
+ KvSinkInfo kvSink = new KvSinkInfo("UTF-8", null);
String transformSql = "select $1 ftime,$2 extinfo from source where
$2='ok'";
- TransformConfig config = new TransformConfig(csvSource, kvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor1 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output1 = processor1.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
- Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "ftime=2024-04-28
00:00:00&extinfo=ok");
// case2
config.setTransformSql("select $1 ftime,$2 extinfo from source where
$2!='ok'");
- TransformProcessor processor2 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
List<String> output2 = processor2.transform("2024-04-28 00:00:00|ok",
new HashMap<>());
- Assert.assertTrue(output2.size() == 0);
+ Assert.assertEquals(0, output2.size());
}
@Test
@@ -94,49 +104,59 @@ public class TestTransformProcessor {
FieldInfo extinfo = new FieldInfo();
extinfo.setName("extinfo");
fields.add(extinfo);
- SourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ KvSourceInfo kvSource = new KvSourceInfo("UTF-8", fields);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
- TransformConfig config = new TransformConfig(kvSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor1 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
// case2
config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
- TransformProcessor processor2 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output2.size() == 0);
+ Assert.assertEquals(0, output2.size());
}
@Test
public void testKv2CsvNoField() throws Exception {
- SourceInfo kvSource = new KvSourceInfo("UTF-8", null);
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null);
+ KvSourceInfo kvSource = new KvSourceInfo("UTF-8", null);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", null);
String transformSql = "select ftime,extinfo from source where
extinfo='ok'";
- TransformConfig config = new TransformConfig(kvSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor1 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor1 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output1.size() == 1);
+ Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
// case2
config.setTransformSql("select ftime,extinfo from source where
extinfo!='ok'");
- TransformProcessor processor2 = new TransformProcessor(config);
+ TransformProcessor<String, String> processor2 = TransformProcessor
+ .create(config, SourceDecoderFactory.createKvDecoder(kvSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output2.size() == 0);
+ Assert.assertEquals(0, output2.size());
}
@Test
public void testJson2Csv() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
- SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
- TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
String srcString = "{\n"
+ " \"sid\":\"value1\",\n"
+ " \"packageID\":\"value2\",\n"
@@ -146,7 +166,7 @@ public class TestTransformProcessor {
+ " ]\n"
+ "}";
List<String> output = processor.transform(srcString, new HashMap<>());
- Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(2, output.size());
Assert.assertEquals(output.get(0),
"value1|value2|1713243918000|value4");
Assert.assertEquals(output.get(1), "value1|value2|1713243918000|v4");
}
@@ -154,12 +174,14 @@ public class TestTransformProcessor {
@Test
public void testJson2CsvForOne() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
- SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ JsonSourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
- TransformConfig config = new TransformConfig(jsonSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createJsonDecoder(jsonSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
String srcString = "{\n"
+ " \"sid\":\"value1\",\n"
+ " \"packageID\":\"value2\",\n"
@@ -169,48 +191,25 @@ public class TestTransformProcessor {
+ " ]\n"
+ "}";
List<String> output = processor.transform(srcString, new HashMap<>());
- Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(1, output.size());
Assert.assertEquals(output.get(0),
"value1|value2|1713243918000|value4");
}
- @Test
- public void testKvCsvByJsonConfig() throws Exception {
- String configString1 =
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
- + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
- +
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
- + "\"escapeChar\":\"\\\\\","
- + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
- + "\"transformSql\":\"select ftime,extinfo from source where
extinfo='ok'\"}";
- // case1
- TransformProcessor processor1 = new TransformProcessor(configString1);
- List<String> output1 = processor1.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output1.size() == 1);
- Assert.assertEquals(output1.get(0), "2024-04-28 00:00:00|ok");
- // case2
- String configString2 =
"{\"sourceInfo\":{\"type\":\"kv\",\"charset\":\"UTF-8\","
- + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
- +
"\"sinkInfo\":{\"type\":\"csv\",\"charset\":\"UTF-8\",\"delimiter\":\"|\","
- + "\"escapeChar\":\"\\\\\","
- + "\"fields\":[{\"name\":\"ftime\"},{\"name\":\"extinfo\"}]},"
- + "\"transformSql\":\"select ftime,extinfo from source where
extinfo!='ok'\"}";
- TransformProcessor processor2 = new TransformProcessor(configString2);
- List<String> output2 = processor2.transform("ftime=2024-04-28
00:00:00&extinfo=ok", new HashMap<>());
- Assert.assertTrue(output2.size() == 0);
- }
-
@Test
public void testPb2Csv() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
- List<String> output = processor.transform(srcBytes, new HashMap<>());
- Assert.assertTrue(output.size() == 2);
+ List<String> output = processor.transform(srcBytes);
+ Assert.assertEquals(2, output.size());
Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
}
@@ -264,15 +263,17 @@ public class TestTransformProcessor {
public void testPb2CsvForOne() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", null);
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", null);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
- Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(1, output.size());
Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
}
@@ -280,8 +281,8 @@ public class TestTransformProcessor {
public void testPb2CsvForAdd() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", null);
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", null);
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select $root.sid,"
+
"($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,"
+
"$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)"
@@ -289,12 +290,14 @@ public class TestTransformProcessor {
+ "$root.msgs(0).msg field4 from source "
+ "where
$root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime"
+ "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
- Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(1, output.size());
Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4");
}
@@ -302,13 +305,15 @@ public class TestTransformProcessor {
public void testPb2CsvForConcat() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select
$root.sid,$root.packageID,$child.msgTime,"
+ "concat($root.sid,$root.packageID,$child.msgTime,$child.msg)
msg,$root.msgs.msgTime.msg from source";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
Assert.assertTrue(output.size() == 2);
@@ -320,14 +325,16 @@ public class TestTransformProcessor {
public void testPb2CsvForNow() throws Exception {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
- SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
- SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64,
"SdkDataRequest", "msgs");
+ CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select now() from source";
- TransformConfig config = new TransformConfig(pbSource, csvSink,
transformSql);
+ TransformConfig config = new TransformConfig(transformSql);
// case1
- TransformProcessor processor = new TransformProcessor(config);
+ TransformProcessor<byte[], String> processor = TransformProcessor
+ .create(config, SourceDecoderFactory.createPbDecoder(pbSource),
+ SinkEncoderFactory.createCsvEncoder(csvSink));
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
- Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(2, output.size());
}
}