This is an automated email from the ASF dual-hosted git repository.

emhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 850b922737 [INLONG-9476][Sort] Add custom function for data time 
transform (#9483)
850b922737 is described below

commit 850b922737e5c822245feebb1a9bcdf279219061
Author: Sting <[email protected]>
AuthorDate: Mon Dec 18 16:24:11 2023 +0800

    [INLONG-9476][Sort] Add custom function for data time transform (#9483)
    
    * [InLong-9476][Sort] Add custom function for data time transform
    
    * [InLong-9476][Sort] Add custom function for data time transform
    
    * change return type to string
    
    * fix comment
    
    * fix standalone ut test
    
    * make zoneid default
    
    * make zoneid default
    
    * add support for different format and test for it
    
    * format check
    
    * make formatters private
---
 .../service/cluster/InlongClusterServiceTest.java  |  1 +
 .../sort/function/RoundTimestampFunction.java      | 76 ++++++++++++++++++++
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  2 +
 .../sort/parser/impl/NativeFlinkSqlParser.java     |  2 +
 .../sort/function/JsonGetterFunctionTest.java      |  2 +-
 ...onTest.java => RoundTimestampFunctionTest.java} | 81 +++++++++++++---------
 6 files changed, 130 insertions(+), 34 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 353a244dfb..870b7e967e 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -332,6 +332,7 @@ public class InlongClusterServiceTest extends 
ServiceBaseTest {
 
         ClusterInfo info = clusterService.get(id, GLOBAL_OPERATOR);
         Assertions.assertInstanceOf(SortClsClusterInfo.class, info);
+        this.deleteCluster(id);
     }
 
     @Test
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
new file mode 100644
index 0000000000..fa729b9b97
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RoundTimestampFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.function;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Round timestamp and output formatted timestamp.
+ */
+public class RoundTimestampFunction extends ScalarFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final Logger LOG = 
LoggerFactory.getLogger(RoundTimestampFunction.class);
+    public static final ZoneId DEFAULT_ZONE = ZoneId.systemDefault();
+    private transient Map<String, DateTimeFormatter> formatters;
+
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        formatters = new HashMap<>();
+    }
+
+    /**
+     * Round timestamp and output formatted timestamp.
+     * For example, if the input timestamp is 1702610371(s), the roundTime is 
600(s), and the format is "yyyyMMddHHmm",
+     * the formatted timestamp is "2023121510".
+     *
+     * @param timestamp The input timestamp in seconds.
+     * @param roundTime The round time in seconds.
+     * @param format The format of the output timestamp.
+     * @return The formatted timestamp.
+     */
+    public String eval(Long timestamp, Long roundTime, String format) {
+        try {
+            LocalDateTime dateTime = LocalDateTime.ofInstant(
+                    Instant.ofEpochSecond(timestamp - timestamp % roundTime),
+                    DEFAULT_ZONE);
+            DateTimeFormatter formatter = formatters.get(format);
+            if (formatter == null) {
+                formatter = DateTimeFormatter.ofPattern(format);
+                formatters.put(format, formatter);
+            }
+            return dateTime.format(formatter);
+        } catch (Exception e) {
+            LOG.error("get formatted timestamp error, timestamp: {}, 
roundTime: {},format: {}",
+                    timestamp, roundTime, format, e);
+            return null;
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 7604549507..61f497d650 100644
--- 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sort.function.EncryptFunction;
 import org.apache.inlong.sort.function.JsonGetterFunction;
 import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
 import org.apache.inlong.sort.function.RegexpReplaceFunction;
+import org.apache.inlong.sort.function.RoundTimestampFunction;
 import org.apache.inlong.sort.parser.Parser;
 import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.parser.result.ParseResult;
@@ -125,6 +126,7 @@ public class FlinkSqlParser implements Parser {
         tableEnv.createTemporarySystemFunction("ENCRYPT", 
EncryptFunction.class);
         tableEnv.createTemporarySystemFunction("JSON_GETTER", 
JsonGetterFunction.class);
         
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, 
EmbeddingFunction.class);
+        tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", 
RoundTimestampFunction.class);
     }
 
     /**
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
index 1e254dabba..d7b4a71038 100644
--- 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sort.function.EncryptFunction;
 import org.apache.inlong.sort.function.JsonGetterFunction;
 import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
 import org.apache.inlong.sort.function.RegexpReplaceFunction;
+import org.apache.inlong.sort.function.RoundTimestampFunction;
 import org.apache.inlong.sort.parser.Parser;
 import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.parser.result.ParseResult;
@@ -74,6 +75,7 @@ public class NativeFlinkSqlParser implements Parser {
         tableEnv.createTemporarySystemFunction("ENCRYPT", 
EncryptFunction.class);
         tableEnv.createTemporarySystemFunction("JSON_GETTER", 
JsonGetterFunction.class);
         
tableEnv.createTemporarySystemFunction(DEFAULT_EMBEDDING_FUNCTION_NAME, 
EmbeddingFunction.class);
+        tableEnv.createTemporarySystemFunction("ROUND_TIMESTAMP", 
RoundTimestampFunction.class);
     }
 
     /**
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
index be7c98dfa1..5c602523d7 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
@@ -46,7 +46,7 @@ public class JsonGetterFunctionTest extends AbstractTestBase {
     /**
      * Test for JsonGetter function
      *
-     * @throws Exception The exception may throw when test Encrypt function
+     * @throws Exception The exception may throw when test JsonGetter function
      */
     @Test
     public void testJsonGetterFunction() throws Exception {
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
similarity index 57%
copy from 
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
copy to 
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
index be7c98dfa1..9fb37c19b0 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/JsonGetterFunctionTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RoundTimestampFunctionTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.sort.function;
 
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -39,17 +35,19 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Test for {@link JsonGetterFunction}
+ * Test for {@link RoundTimestampFunction}
  */
-public class JsonGetterFunctionTest extends AbstractTestBase {
+public class RoundTimestampFunctionTest extends AbstractTestBase {
+
+    public static final long TEST_TIMESTAMP = 1702610371L;
 
     /**
-     * Test for JsonGetter function
+     * Test for round timestamp function
      *
-     * @throws Exception The exception may throw when test Encrypt function
+     * @throws Exception The exception may throw when test round timestamp 
function
      */
     @Test
-    public void testJsonGetterFunction() throws Exception {
+    public void testRoundTimestampFunction() throws Exception {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
                 .inStreamingMode()
@@ -58,42 +56,59 @@ public class JsonGetterFunctionTest extends 
AbstractTestBase {
         env.setParallelism(1);
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
-        // step 1. Register custom function of json getter
-        tableEnv.createTemporaryFunction("JSON_GETTER", 
JsonGetterFunction.class);
+
+        // step 1. Register custom function of ROUND_TIMESTAMP
+        tableEnv.createTemporaryFunction("ROUND_TIMESTAMP", 
RoundTimestampFunction.class);
+
         // step 2. Generate test data and convert to DataStream
+
         List<Row> data = new ArrayList<>();
-        data.add(Row.of("{\"name\":\"abc\"}"));
-        TypeInformation<?>[] types = {
-                BasicTypeInfo.STRING_TYPE_INFO};
-        String[] names = {"content"};
+        data.add(Row.of(TEST_TIMESTAMP));
+        TypeInformation<?>[] types = {BasicTypeInfo.LONG_TYPE_INFO};
+
+        String[] names = {"f1"};
         RowTypeInfo typeInfo = new RowTypeInfo(types, names);
         DataStream<Row> dataStream = 
env.fromCollection(data).returns(typeInfo);
 
-        // step 3. Convert from DataStream to Table and execute the json 
getter function
-        Table tempView = tableEnv.fromDataStream(dataStream).as("content");
-        tableEnv.createTemporaryView("temp_view", tempView);
+        String formattedTimestamp = "2023121510";
 
-        
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction 
jsonGetterFunction =
-                new 
org.apache.inlong.sort.protocol.transformation.function.JsonGetterFunction(
-                        new FieldInfo("content",
-                                new StringFormatInfo()),
-                        new StringConstantParam("name"));
+        // step 3. Convert from DataStream to Table and execute the 
ROUND_TIMESTAMP function
+        Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
+        tableEnv.createTemporaryView("temp_view", tempView);
+        Table outputTable = tableEnv.sqlQuery(
+                "SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmm') " +
+                        "from temp_view");
 
-        String sqlQuery = String.format("SELECT %s as content FROM temp_view", 
jsonGetterFunction.format());
-        Table outputTable = tableEnv.sqlQuery(sqlQuery);
         // step 4. Get function execution result and parse it
         DataStream<Row> resultSet = tableEnv.toAppendStream(outputTable, 
Row.class);
         List<String> result = new ArrayList<>();
+        for (CloseableIterator<Row> it = resultSet.executeAndCollect(); 
it.hasNext();) {
+            Row row = it.next();
+            if (row != null) {
+                result.add(row.getField(0).toString());
+            }
+        }
 
-        for (CloseableIterator<String> it = resultSet.map(s -> 
s.getField(0).toString()).executeAndCollect(); it
-                .hasNext();) {
-            String next = it.next();
-            result.add(next);
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.get(0), formattedTimestamp);
+
+        // step 5. provide a different format and check the result
+        Table outputTable2 = tableEnv.sqlQuery(
+                "SELECT ROUND_TIMESTAMP(f1, 600, 'yyyyMMddmmss') " +
+                        "from temp_view");
+
+        DataStream<Row> resultSet2 = tableEnv.toAppendStream(outputTable2, 
Row.class);
+        List<String> result2 = new ArrayList<>();
+        for (CloseableIterator<Row> it = resultSet2.executeAndCollect(); 
it.hasNext();) {
+            Row row = it.next();
+            if (row != null) {
+                result2.add(row.getField(0).toString());
+            }
         }
 
-        // step 5. Whether the comparison results are as expected
-        String expect = "abc";
-        Assert.assertEquals(expect, result.get(0));
+        Assert.assertEquals(result2.size(), 1);
+        Assert.assertEquals(result2.get(0), formattedTimestamp + "00");
+
     }
 
-}
+}
\ No newline at end of file

Reply via email to