hackergin commented on a change in pull request #18058:
URL: https://github.com/apache/flink/pull/18058#discussion_r772471170
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java
##########
@@ -302,6 +306,58 @@ public void testWritingDocumentsWithDynamicIndex() throws
Exception {
Assertions.assertEquals(response, expectedMap);
}
+ @Test
+ public void testWritingDocumentsWithDynamicIndexFromProcTime() throws
Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+ DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ tableEnvironment
+ .getConfig()
+ .getConfiguration()
+ .setString("table.local-time-zone", "Asia/Shanghai");
+
+ String dynamicIndex1 =
+ "dynamic-index-"
+ +
dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai")));
+ String index = "dynamic-index-{now()|yyyy-MM-dd}";
Review comment:
> It looks good but I have some more generic comments/questions:
>
> * We are using `now` from elasticsearch but we create the concrete index
names ourselves using our utilities, instead of letting elasticsearch do that
using it's system time, have you considered that? If we use our own parsing,
date formatting, etc. if something changes in the future regarding
elasticsearch index patterns we need to accommodate those changes in our code
in order to be able to create the concrete index name.
> * Maybe we should consider using a metadata column, were the user can
define the expression that gives the proctime, to offer more flexibility
instead of using the `LocalDateTime.now()` in the connector.
> * Maybe we should extend this flexibility so that the whole index pattern
(prefix + date pattern + suffix) is defined by the user using this metadata
column, so that the same sink table can be used by different SQL insert
statements, inserting into different index based on the metadata column without
the need of creating multiple sink tables with SQL, each one with a more static
index name.
> * For all those cases, including the current implementation of the PR, we
cannot support retractions for this sink, as a record can be inserted into for
example `my-index-2021-12-20` but a later delete/update could reference the
next day's index (`my-index-2021-12-21`) and cause issues.
Thanks for the review and the conserce about this feature.
To be honest , I didn't considered this deeply before.
Currently, the elasticsearch-connector manage the creation of elasticsearch
index. User should make sure the index be created correctly before running the
flink job. The index pattern is used to generate the right index name.
So I don't think we should keep the change with elasticsearch bahivior if
I also agree adding metadata column support, so that users can generate
dynamic index with a flexibility.
I prefer to keep this change and open a new ticket to add supporting
metadata column. cc @wuchong
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]