alpreu commented on a change in pull request #18058:
URL: https://github.com/apache/flink/pull/18058#discussion_r765507981
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -181,6 +182,23 @@ public void testDynamicIndexDefaultFormat() {
Assertions.assertEquals("my-index-12_13_14",
indexGenerator.generate(rows.get(1)));
}
+ @Test
+ public void testProcTimeDynamicIndex() {
Review comment:
```suggestion
public void testDynamicIndexFromProcTime() {
```
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
##########
@@ -176,6 +197,8 @@ private static DynamicFormatter createFormatFunction(
private static final Pattern dynamicIndexPattern =
Pattern.compile("\\{[^\\{\\}]+\\}?");
private static final Pattern dynamicIndexTimeExtractPattern =
Pattern.compile(".*\\{.+\\|.*\\}.*");
+ private static final Pattern
procTimeDynamicIndexProcTimeExtractPattern =
Review comment:
```suggestion
private static final Pattern dynamicIndexProcTimeExtractPattern =
```
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
##########
@@ -240,6 +263,11 @@ boolean checkIsDynamicIndexWithFormat(String index) {
return dynamicIndexTimeExtractPattern.matcher(index).matches();
}
+ /** Check generate a dynamic index is based on processing time. */
+ boolean checkIsProcTimeDynamicIndexWithFormat(String index) {
Review comment:
```suggestion
boolean checkIsDynamicIndexWithProcTimeFormat(String index) {
```
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
##########
@@ -168,6 +183,12 @@ private static DynamicFormatter createFormatFunction(
}
}
+ private static DynamicFormatter createProcTimeFormatFunction() {
+ return (value, dateTimeFormatter) -> {
+ return LocalDateTime.now().format(dateTimeFormatter);
+ };
+ }
+
Review comment:
This is not used anywhere, is it?
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
##########
@@ -88,6 +89,20 @@ private static IndexGenerator createRuntimeIndexGenerator(
index.substring(indexPrefix.length() +
dynamicIndexPatternStr.length());
final boolean isDynamicIndexWithFormat =
indexHelper.checkIsDynamicIndexWithFormat(index);
+ final boolean isProcTimeDynamicIndex =
Review comment:
```suggestion
final boolean isDynamicIndexWithProcTimeFormat =
```
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
##########
@@ -88,6 +89,20 @@ private static IndexGenerator createRuntimeIndexGenerator(
index.substring(indexPrefix.length() +
dynamicIndexPatternStr.length());
final boolean isDynamicIndexWithFormat =
indexHelper.checkIsDynamicIndexWithFormat(index);
+ final boolean isProcTimeDynamicIndex =
+ indexHelper.checkIsProcTimeDynamicIndexWithFormat(index);
+
+ if (isProcTimeDynamicIndex) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(index,
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
Review comment:
Could you explain why you are using `TIME_WITHOUT_TIME_ZONE` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]