This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ff6f2c  [FLINK-24571][connectors/elasticsearch] Supports a system 
time function(now() and current_timestamp) in index pattern
0ff6f2c is described below

commit 0ff6f2cc78f3f45ed5286b1ff1af613571d0ba1f
Author: jinfeng <[email protected]>
AuthorDate: Thu Dec 9 20:36:48 2021 +0800

    [FLINK-24571][connectors/elasticsearch] Supports a system time 
function(now() and current_timestamp) in index pattern
---
 .../docs/connectors/table/elasticsearch.md         |  4 ++
 .../content/docs/connectors/table/elasticsearch.md |  5 ++
 .../table/ElasticsearchDynamicSink.java            | 24 +++++++-
 .../table/ElasticsearchDynamicSinkFactoryBase.java | 16 ++++-
 .../elasticsearch/table/IndexGeneratorFactory.java | 45 ++++++++++++--
 .../table/ElasticsearchDynamicSinkBaseITCase.java  | 58 ++++++++++++++++++
 .../ElasticsearchDynamicSinkFactoryBaseTest.java   | 23 ++++++++
 .../elasticsearch/table/IndexGeneratorTest.java    | 69 ++++++++++++++++++++++
 8 files changed, 236 insertions(+), 8 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/elasticsearch.md 
b/docs/content.zh/docs/connectors/table/elasticsearch.md
index a3f6001..339d9d5 100644
--- a/docs/content.zh/docs/connectors/table/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/table/elasticsearch.md
@@ -263,6 +263,10 @@ Elasticsearch sink 同时支持静态索引和动态索引。
 `date_format_string` 与 Java 的 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html) 兼容。
 例如,如果选项值设置为 `'myusers-{log_ts|yyyy-MM-dd}'`,则 `log_ts` 字段值为 `2020-03-27 
12:25:55` 的记录将被写入到 "myusers-2020-03-27" 索引中。
 
+你也可以使用 `'{now()|date_format_string}'` 将当前的系统时间转换为 `date_format_string` 
指定的格式。`now()` 对应的时间类型是 `TIMESTAMP_WITH_LTZ` 。
+在将系统时间格式化为字符串时会使用 session 中通过 `table.local-time-zone` 中配置的时区。 使用 `NOW()`, 
`now()`, `CURRENT_TIMESTAMP`, `current_timestamp` 均可以。
+
+**注意:** 使用当前系统时间生成的动态索引时, 对于 changelog 的流,无法保证同一主键对应的记录能产生相同的索引名, 
因此使用基于系统时间的动态索引,只能支持 append only 的流。
 
 数据类型映射
 ----------------
diff --git a/docs/content/docs/connectors/table/elasticsearch.md 
b/docs/content/docs/connectors/table/elasticsearch.md
index 1054c6f..78d0ce9 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -298,6 +298,11 @@ You can also use `'{field_name|date_format_string}'` to 
convert a field value of
 The `date_format_string` is compatible with Java's 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
 For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a 
record with `log_ts` field value `2020-03-27 12:25:55` will be written into 
"myusers-2020-03-27" index.
 
+You can also use `'{now()|date_format_string}'` to convert the current system 
time to the format specified by `date_format_string`. The corresponding time 
type of `now()` is `TIMESTAMP_WITH_LTZ`.
+When formatting the system time as a string, the time zone configured in the 
session through `table.local-time-zone` will be used. You can use `NOW()`, 
`now()`, `CURRENT_TIMESTAMP`, `current_timestamp`.
+
+**NOTE:**  When using the dynamic index generated by the current system time, 
for changelog stream, there is no guarantee that the records with the same 
primary key can generate the same index name. 
+Therefore, the dynamic index based on the system time can only support append 
only stream.
 
 Data Type Mapping
 ----------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index 975db5c..0fd389b 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
 import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase;
 import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -37,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 
 import javax.annotation.Nullable;
 
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
@@ -54,10 +56,12 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
     final DataType physicalRowDataType;
     final List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex;
     final ElasticsearchConfiguration config;
+    final ZoneId localTimeZoneId;
 
     final String summaryString;
     final ElasticsearchSinkBuilderSupplier<RowData> builderSupplier;
     @Nullable final String documentType;
+    final boolean isDynamicIndexWithSystemTime;
 
     ElasticsearchDynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
@@ -66,7 +70,8 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
             DataType physicalRowDataType,
             String summaryString,
             ElasticsearchSinkBuilderSupplier<RowData> builderSupplier,
-            @Nullable String documentType) {
+            @Nullable String documentType,
+            ZoneId localTimeZoneId) {
         this.format = checkNotNull(format);
         this.physicalRowDataType = checkNotNull(physicalRowDataType);
         this.primaryKeyLogicalTypesWithIndex = 
checkNotNull(primaryKeyLogicalTypesWithIndex);
@@ -74,6 +79,13 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
         this.summaryString = checkNotNull(summaryString);
         this.builderSupplier = checkNotNull(builderSupplier);
         this.documentType = documentType;
+        this.localTimeZoneId = localTimeZoneId;
+        this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
+    }
+
+    public boolean isDynamicIndexWithSystemTime() {
+        IndexGeneratorFactory.IndexHelper indexHelper = new 
IndexGeneratorFactory.IndexHelper();
+        return 
indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
     }
 
     Function<RowData, String> createKeyExtractor() {
@@ -85,7 +97,8 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
         return IndexGeneratorFactory.createIndexGenerator(
                 config.getIndex(),
                 DataType.getFieldNames(physicalRowDataType),
-                DataType.getFieldDataTypes(physicalRowDataType));
+                DataType.getFieldDataTypes(physicalRowDataType),
+                localTimeZoneId);
     }
 
     @Override
@@ -96,6 +109,10 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
                 builder.addContainedKind(kind);
             }
         }
+        if (isDynamicIndexWithSystemTime && 
!requestedMode.containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    "Dynamic indexing based on system time only works on 
append only stream.");
+        }
         return builder.build();
     }
 
@@ -169,7 +186,8 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
                 physicalRowDataType,
                 summaryString,
                 builderSupplier,
-                documentType);
+                documentType,
+                localTimeZoneId);
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
index af35888..ed5e7f7 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
@@ -21,7 +21,9 @@ package org.apache.flink.connector.elasticsearch.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.Projection;
@@ -37,6 +39,7 @@ import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -106,13 +109,24 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
                 context.getPhysicalRowDataType(),
                 capitalize(factoryIdentifier),
                 sinkBuilderSupplier,
-                getDocumentType(config));
+                getDocumentType(config),
+                getLocalTimeZoneId(context.getConfiguration()));
     }
 
     ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper 
helper) {
         return new ElasticsearchConfiguration(helper.getOptions());
     }
 
+    ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+        final String zone = 
readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+        final ZoneId zoneId =
+                TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                        ? ZoneId.systemDefault()
+                        : ZoneId.of(zone);
+
+        return zoneId;
+    }
+
     void validateConfiguration(ElasticsearchConfiguration config) {
         config.getHosts(); // validate hosts
         validate(
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
index baf0e04..ec2a006 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -30,7 +30,9 @@ import javax.annotation.Nonnull;
 
 import java.io.Serializable;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -63,30 +65,57 @@ final class IndexGeneratorFactory {
     private IndexGeneratorFactory() {}
 
     public static IndexGenerator createIndexGenerator(
-            String index, List<String> fieldNames, List<DataType> dataTypes) {
+            String index,
+            List<String> fieldNames,
+            List<DataType> dataTypes,
+            ZoneId localTimeZoneId) {
         final IndexHelper indexHelper = new IndexHelper();
         if (indexHelper.checkIsDynamicIndex(index)) {
             return createRuntimeIndexGenerator(
                     index,
                     fieldNames.toArray(new String[0]),
                     dataTypes.toArray(new DataType[0]),
-                    indexHelper);
+                    indexHelper,
+                    localTimeZoneId);
         } else {
             return new StaticIndexGenerator(index);
         }
     }
 
+    public static IndexGenerator createIndexGenerator(
+            String index, List<String> fieldNames, List<DataType> dataTypes) {
+        return createIndexGenerator(index, fieldNames, dataTypes, 
ZoneId.systemDefault());
+    }
+
     interface DynamicFormatter extends Serializable {
         String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
     }
 
     private static IndexGenerator createRuntimeIndexGenerator(
-            String index, String[] fieldNames, DataType[] fieldTypes, 
IndexHelper indexHelper) {
+            String index,
+            String[] fieldNames,
+            DataType[] fieldTypes,
+            IndexHelper indexHelper,
+            ZoneId localTimeZoneId) {
         final String dynamicIndexPatternStr = 
indexHelper.extractDynamicIndexPatternStr(index);
         final String indexPrefix = index.substring(0, 
index.indexOf(dynamicIndexPatternStr));
         final String indexSuffix =
                 index.substring(indexPrefix.length() + 
dynamicIndexPatternStr.length());
 
+        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+            final String dateTimeFormat =
+                    indexHelper.extractDateFormat(
+                            index, 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+                @Override
+                public String generate(RowData row) {
+                    return indexPrefix
+                            
.concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
+                            .concat(indexSuffix);
+                }
+            };
+        }
+
         final boolean isDynamicIndexWithFormat = 
indexHelper.checkIsDynamicIndexWithFormat(index);
         final int indexFieldPos =
                 indexHelper.extractIndexFieldPos(index, fieldNames, 
isDynamicIndexWithFormat);
@@ -172,10 +201,13 @@ final class IndexGeneratorFactory {
      * Helper class for {@link IndexGeneratorFactory}, this helper can use to 
validate index field
      * type ans parse index format from pattern.
      */
-    private static class IndexHelper {
+    static class IndexHelper {
         private static final Pattern dynamicIndexPattern = 
Pattern.compile("\\{[^\\{\\}]+\\}?");
         private static final Pattern dynamicIndexTimeExtractPattern =
                 Pattern.compile(".*\\{.+\\|.*\\}.*");
+        private static final Pattern dynamicIndexSystemTimeExtractPattern =
+                Pattern.compile(
+                        
".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*");
         private static final List<LogicalTypeRoot> supportedTypes = new 
ArrayList<>();
         private static final Map<LogicalTypeRoot, String> defaultFormats = new 
HashMap<>();
 
@@ -240,6 +272,11 @@ final class IndexGeneratorFactory {
             return dynamicIndexTimeExtractPattern.matcher(index).matches();
         }
 
+        /** Check generate dynamic index is from system time or not. */
+        boolean checkIsDynamicIndexWithSystemTimeFormat(String index) {
+            return 
dynamicIndexSystemTimeExtractPattern.matcher(index).matches();
+        }
+
         /** Extract dynamic index pattern string from index pattern string. */
         String extractDynamicIndexPatternStr(String index) {
             int start = index.indexOf("{");
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
index 192d36c..21ad5fd 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
@@ -38,8 +38,10 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHits;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -50,6 +52,8 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -299,4 +303,58 @@ abstract class ElasticsearchDynamicSinkBaseITCase {
         expectedMap.put("b", "2012-12-12 12:12:12");
         Assertions.assertEquals(response, expectedMap);
     }
+
+    @Test
+    public void testWritingDocumentsWithDynamicIndexFromSystemTime() throws 
Exception {
+        TableEnvironment tableEnvironment =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+        tableEnvironment
+                .getConfig()
+                .getConfiguration()
+                .setString("table.local-time-zone", "Asia/Shanghai");
+
+        String dynamicIndex1 =
+                "dynamic-index-"
+                        + 
dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai")))
+                        + "_index";
+        String index = "dynamic-index-{now()|yyyy-MM-dd}_index";
+        tableEnvironment.executeSql(
+                "CREATE TABLE esTable ("
+                        + "a BIGINT NOT NULL,\n"
+                        + "b TIMESTAMP NOT NULL,\n"
+                        + "PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + getConnectorSql(index)
+                        + ")");
+        String dynamicIndex2 =
+                "dynamic-index-"
+                        + 
dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai")))
+                        + "_index";
+
+        tableEnvironment
+                .fromValues(row(1L, 
LocalDateTime.parse("2012-12-12T12:12:12")))
+                .executeInsert("esTable")
+                .await();
+
+        RestHighLevelClient client = getClient();
+
+        Map<String, Object> response;
+        try {
+            response = makeGetRequest(client, dynamicIndex1, "1");
+        } catch (ElasticsearchStatusException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                response = makeGetRequest(client, dynamicIndex2, "1");
+            } else {
+                throw e;
+            }
+        }
+
+        Map<Object, Object> expectedMap = new HashMap<>();
+        expectedMap.put("a", 1);
+        expectedMap.put("b", "2012-12-12 12:12:12");
+        Assertions.assertEquals(response, expectedMap);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
index 7a1e04f..8fa0a49 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java
@@ -25,8 +25,10 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.Assertions;
@@ -235,6 +237,27 @@ abstract class ElasticsearchDynamicSinkFactoryBaseTest {
     }
 
     @Test
+    public void validateDynamicIndexOnChangelogStream() {
+        ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
+        DynamicTableSink sink =
+                sinkFactory.createDynamicTableSink(
+                        createPrefilledTestContext()
+                                .withOption(
+                                        
ElasticsearchConnectorOptions.INDEX_OPTION.key(),
+                                        
"dynamic-index-{now()|yyyy-MM-dd}_index")
+                                .build());
+
+        ChangelogMode changelogMode =
+                ChangelogMode.newBuilder()
+                        .addContainedKind(RowKind.DELETE)
+                        .addContainedKind(RowKind.INSERT)
+                        .build();
+        assertValidationException(
+                "Dynamic indexing based on system time only works on append 
only stream.",
+                () -> sink.getChangelogMode(changelogMode));
+    }
+
+    @Test
     public void testSinkParallelism() {
         ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
         DynamicTableSink sink =
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
index de2aad5..7840bfa 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
@@ -35,6 +35,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
 import java.time.temporal.UnsupportedTemporalTypeException;
 import java.util.Arrays;
 import java.util.List;
@@ -182,6 +183,74 @@ public class IndexGeneratorTest {
     }
 
     @Test
+    public void testDynamicIndexFromSystemTime() {
+        List<String> supportedUseCases =
+                Arrays.asList(
+                        "now()",
+                        "NOW()",
+                        "now( )",
+                        "NOW(\t)",
+                        "\t NOW( ) \t",
+                        "current_timestamp",
+                        "CURRENT_TIMESTAMP",
+                        "\tcurrent_timestamp\t",
+                        " current_timestamp ");
+
+        supportedUseCases.stream()
+                .forEach(
+                        f -> {
+                            DateTimeFormatter dateTimeFormatter =
+                                    DateTimeFormatter.ofPattern("yyyy_MM_dd");
+                            IndexGenerator indexGenerator =
+                                    IndexGeneratorFactory.createIndexGenerator(
+                                            
String.format("my-index-{%s|yyyy_MM_dd}", f),
+                                            fieldNames,
+                                            dataTypes);
+                            indexGenerator.open();
+                            // The date may change during the running of the 
unit test.
+                            // Generate expected index-name based on the 
current time
+                            // before and after calling the generate method.
+                            String expectedIndex1 =
+                                    "my-index-" + 
LocalDateTime.now().format(dateTimeFormatter);
+                            String actualIndex = 
indexGenerator.generate(rows.get(1));
+                            String expectedIndex2 =
+                                    "my-index-" + 
LocalDateTime.now().format(dateTimeFormatter);
+                            Assertions.assertTrue(
+                                    actualIndex.equals(expectedIndex1)
+                                            || 
actualIndex.equals(expectedIndex2));
+                        });
+
+        List<String> invalidUseCases =
+                Arrays.asList(
+                        "now",
+                        "now(",
+                        "NOW",
+                        "NOW)",
+                        "current_timestamp()",
+                        "CURRENT_TIMESTAMP()",
+                        "CURRENT_timestamp");
+        invalidUseCases.stream()
+                .forEach(
+                        f -> {
+                            String expectedExceptionMsg =
+                                    String.format(
+                                            "Unknown field '%s' in index 
pattern 'my-index-{%s|yyyy_MM_dd}',"
+                                                    + " please check the field 
name.",
+                                            f, f);
+                            try {
+                                IndexGenerator indexGenerator =
+                                        
IndexGeneratorFactory.createIndexGenerator(
+                                                
String.format("my-index-{%s|yyyy_MM_dd}", f),
+                                                fieldNames,
+                                                dataTypes);
+                                indexGenerator.open();
+                            } catch (TableException e) {
+                                Assertions.assertEquals(expectedExceptionMsg, 
e.getMessage());
+                            }
+                        });
+    }
+
+    @Test
     public void testGeneralDynamicIndex() {
         IndexGenerator indexGenerator =
                 IndexGeneratorFactory.createIndexGenerator("index_{item}", 
fieldNames, dataTypes);

Reply via email to