This is an automated email from the ASF dual-hosted git repository.
emhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 850b922737 [INLONG-9476][Sort] Add custom function for data time
transform (#9483)
850b922737 is described below
commit 850b922737e5c822245feebb1a9bcdf279219061
Author: Sting <[email protected]>
AuthorDate: Mon Dec 18 16:24:11 2023 +0800
[INLONG-9476][Sort] Add custom function for data time transform (#9483)
* [InLong-9476][Sort] Add custom function for data time transform
* [InLong-9476][Sort] Add custom function for data time transform
* change return type to string
* fix comment
* fix standalone ut test
* make zoneid default
* make zoneid default
* add support for different format and test for it
* format check
* make formatters private
---
.../service/cluster/InlongClusterServiceTest.java | 1 +
.../sort/function/RoundTimestampFunction.java | 76 ++++++++++++++++++++
.../inlong/sort/parser/impl/FlinkSqlParser.java | 2 +
.../sort/parser/impl/NativeFlinkSqlParser.java | 2 +
.../sort/function/JsonGetterFunctionTest.java | 2 +-
...onTest.java => RoundTimestampFunctionTest.java} | 81 +++++++++++++---------
6 files changed, 130 insertions(+), 34 deletions(-)
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 353a244dfb..870b7e967e 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -332,6 +332,7 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
Assertions.assertInstanceOf(SortClsClusterInfo.class, info);
+ this.deleteCluster(id);
}
@Test
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
new file mode 100644
index 0000000000..fa729b9b97
--- /dev/null
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.function;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Round timestamp and output formatted timestamp.
+ */
+public class RoundTimestampFunction extends ScalarFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(RoundTimestampFunction.class);
+ public static final ZoneId DEFAULT_ZONE = ZoneId.systemDefault();
+ private transient Map<String, DateTimeFormatter> formatters;
+
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ formatters = new HashMap<>();
+ }
+
+ /**
+ * Round timestamp and output formatted timestamp.
+ * For example, if the input timestamp is 1702610371(s), the roundTime is
600(s), and the format is "yyyyMMddHHmm",
+ * the formatted timestamp is "2023121510".
+ *
+ * @param timestamp The input timestamp in seconds.
+ * @param roundTime The round time in seconds.
+ * @param format The format of the output timestamp.
+ * @return The formatted timestamp.
+ */
+ public String eval(Long timestamp, Long roundTime, String format) {
+ try {
+ LocalDateTime dateTime = LocalDateTime.ofInstant(
+ Instant.ofEpochSecond(timestamp - timestamp % roundTime),
+ DEFAULT_ZONE);
+ DateTimeFormatter formatter = formatters.get(format);
+ if (formatter == null) {
+ formatter = DateTimeFormatter.ofPattern(format);
+ formatters.put(format, formatter);
+ }
+ return dateTime.format(formatter);
+ } catch (Exception e) {
+ LOG.error("get formatted timestamp error, timestamp: {},
roundTime: {},format: {}",
+ timestamp, roundTime, format, e);
+ return null;
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 7604549507..61f497d650 100644
---
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sort.function.EncryptFunction;
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
+import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
@@ -125,6 +126,7 @@ public class FlinkSqlParser implements Parser {
tableEnv.createTemporarySystemFunction("ENCRYPT",
EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER",
JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME,
EmbeddingFunction.class);
+ tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP",
RoundTimestampFunction.class);
}
/**
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
index 1e254dabba..d7b4a71038 100644
---
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sort.function.EncryptFunction;
import org.apache.inlong.sort.function.JsonGetterFunction;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.function.RegexpReplaceFunction;
+import org.apache.inlong.sort.function.RoundTimestampFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
@@ -74,6 +75,7 @@ public class NativeFlinkSqlParser implements Parser {
tableEnv.createTemporarySystemFunction("ENCRYPT",
EncryptFunction.class);
tableEnv.createTemporarySystemFunction("JSON_GETTER",
JsonGetterFunction.class);
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME,
EmbeddingFunction.class);
+ tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP",
RoundTimestampFunction.class);
}
/**
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
index be7c98dfa1..5c602523d7 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
@@ -46,7 +46,7 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
/**
* Test for JsonGetter function
*
- * @throws Exception The exception may throw when test Encrypt function
+ * @throws Exception The exception may throw when test JsonGetter function
*/
@Test
public void testJsonGetterFunction() throws Exception {
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
similarity index 57%
copy from
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
copy to
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
index be7c98dfa1..9fb37c19b0 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
@@ -17,10 +17,6 @@
package org.apache.inlong.sort.function;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -39,17 +35,19 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Test for {@link JsonGetterFunction}
+ * Test for {@link RoundTimestampFunction}
*/
-public class JsonGetterFunctionTest extends AbstractTestBase {
+public class RoundTimestampFunctionTest extends AbstractTestBase {
+
+ public static final long TEST_TIMESTAMP = 1702610371L;
/**
- * Test for JsonGetter function
+ * Test for round timestamp function
*
- * @throws Exception The exception may throw when test Encrypt function
+ * @throws Exception The exception may throw when test round timestamp
function
*/
@Test
- public void testJsonGetterFunction() throws Exception {
+ public void testRoundTimestampFunction() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
@@ -58,42 +56,59 @@ public class JsonGetterFunctionTest extends
AbstractTestBase {
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
- // step 1. Register custom function of json getter
- tableEnv.createTemporaryFunction("JSON_GETTER",
JsonGetterFunction.class);
+
+ // step 1. Register custom function of ROUND_TIMESTAMP
+ tableEnv.createTemporaryFunction("ROUND_TIMESTAMP",
RoundTimestampFunction.class);
+
// step 2. Generate test data and convert to DataStream
+
List<Row> data = new ArrayList<>();
- data.add(Row.of("{\"name\":\"abc\"}"));
- TypeInformation<?>[] types = {
- BasicTypeInfo.STRING_TYPE_INFO};
- String[] names = {"content"};
+ data.add(Row.of(TEST_TIMESTAMP));
+ TypeInformation<?>[] types = {BasicTypeInfo.LONG_TYPE_INFO};
+
+ String[] names = {"f1"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> dataStream =
env.fromCollection(data).returns(typeInfo);
- // step 3. Convert from DataStream to Table and execute the json
getter function
- Table tempView = tableEnv.fromDataStream(dataStream).as("content");
- tableEnv.createTemporaryView("temp_view", tempView);
+ String formattedTimestamp = "2023121510";
-
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction
jsonGetterFunction =
- new
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction(
- new FieldInfo("content",
- new StringFormatInfo()),
- new StringConstantParam("name"));
+ // step 3. Convert from DataStream to Table and execute the
ROUND_TIMESTAMP function
+ Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
+ tableEnv.createTemporaryView("temp_view", tempView);
+ Table outputTable = tableEnv.sqlQuery(
+ "SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmm') " +
+ "from temp_view");
- String sqlQuery = String.format("SELECT %s as content FROM temp_view",
jsonGetterFunction.format());
- Table outputTable = tableEnv.sqlQuery(sqlQuery);
// step 4. Get function execution result and parse it
DataStream<Row> resultSet = tableEnv.toAppendStream(outputTable,
Row.class);
List<String> result = new ArrayList<>();
+ for (CloseableIterator<Row> it = resultSet.executeAndCollect();
it.hasNext();) {
+ Row row = it.next();
+ if (row != null) {
+ result.add(row.getField(0).toString());
+ }
+ }
- for (CloseableIterator<String> it = resultSet.map(s ->
s.getField(0).toString()).executeAndCollect(); it
- .hasNext();) {
- String next = it.next();
- result.add(next);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0), formattedTimestamp);
+
+ // step 5. provide a different format and check the result
+ Table outputTable2 = tableEnv.sqlQuery(
+ "SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmmss') " +
+ "from temp_view");
+
+ DataStream<Row> resultSet2 = tableEnv.toAppendStream(outputTable2,
Row.class);
+ List<String> result2 = new ArrayList<>();
+ for (CloseableIterator<Row> it = resultSet2.executeAndCollect();
it.hasNext();) {
+ Row row = it.next();
+ if (row != null) {
+ result2.add(row.getField(0).toString());
+ }
}
- // step 5. Whether the comparison results are as expected
- String expect = "abc";
- Assert.assertEquals(expect, result.get(0));
+ Assert.assertEquals(result2.size(), 1);
+ Assert.assertEquals(result2.get(0), formattedTimestamp + "00");
+
}
-}
+}
\ No newline at end of file