This is an automated email from the ASF dual-hosted git repository.

reswqa pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new b24cb39  [FLINK-39774] Format YEAR-MONTH interval keys as months 
instead of days
b24cb39 is described below

commit b24cb39151f10c4950d88c9702fb5eaa7f683192
Author: vernedeng <[email protected]>
AuthorDate: Thu May 28 15:06:23 2026 +0800

    [FLINK-39774] Format YEAR-MONTH interval keys as months instead of days
---
 .../connector/elasticsearch/table/KeyExtractor.java  |  2 +-
 .../elasticsearch/table/KeyExtractorTest.java        | 20 ++++++++++++++++++++
 .../connector/elasticsearch/table/KeyExtractor.java  |  2 +-
 .../elasticsearch/table/KeyExtractorTest.java        | 20 ++++++++++++++++++++
 4 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
index f369ee0..25cb897 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
@@ -84,7 +84,7 @@ class KeyExtractor implements SerializableFunction<RowData, 
String> {
                 return (row) ->
                         LocalTime.ofNanoOfDay((long) row.getInt(index) * 
1_000_000L).toString();
             case INTERVAL_YEAR_MONTH:
-                return (row) -> Period.ofDays(row.getInt(index)).toString();
+                return (row) -> Period.ofMonths(row.getInt(index)).toString();
             case INTERVAL_DAY_TIME:
                 return (row) -> 
Duration.ofMillis(row.getLong(index)).toString();
             case DISTINCT_TYPE:
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
index 29f7502..ecb58f7 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
 
 import org.junit.jupiter.api.Test;
 
@@ -36,6 +38,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND;
+import static 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KeyExtractor}. */
@@ -89,6 +93,22 @@ public class KeyExtractorTest {
         assertThat(key).isEqualTo("12_2012-12-12T12:12:12");
     }
 
+    @Test
+    public void testIntervalTypesKey() {
+        List<LogicalTypeWithIndex> logicalTypesWithIndex =
+                Stream.of(
+                                new LogicalTypeWithIndex(
+                                        0, new 
YearMonthIntervalType(YEAR_TO_MONTH)),
+                                new LogicalTypeWithIndex(1, new 
DayTimeIntervalType(DAY_TO_SECOND)))
+                        .collect(Collectors.toList());
+
+        Function<RowData, String> keyExtractor =
+                KeyExtractor.createKeyExtractor(logicalTypesWithIndex, "_");
+
+        String key = keyExtractor.apply(GenericRowData.of(13, 3_600_000L));
+        assertThat(key).isEqualTo("P13M_PT1H");
+    }
+
     @Test
     public void testAllTypesKey() {
         List<LogicalTypeWithIndex> logicalTypesWithIndex =
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
index be456f4..ef472ed 100644
--- 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
@@ -87,7 +87,7 @@ class KeyExtractor implements SerializableFunction<RowData, 
String> {
                 return (row) ->
                         LocalTime.ofNanoOfDay((long) row.getInt(index) * 
1_000_000L).toString();
             case INTERVAL_YEAR_MONTH:
-                return (row) -> Period.ofDays(row.getInt(index)).toString();
+                return (row) -> Period.ofMonths(row.getInt(index)).toString();
             case INTERVAL_DAY_TIME:
                 return (row) -> 
Duration.ofMillis(row.getLong(index)).toString();
             case DISTINCT_TYPE:
diff --git 
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
index 5cdce90..cbc7f83 100644
--- 
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
+++ 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/KeyExtractorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
 
 import org.junit.jupiter.api.Test;
 
@@ -39,6 +41,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND;
+import static 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KeyExtractor}. */
@@ -92,6 +96,22 @@ public class KeyExtractorTest {
         assertThat(key).isEqualTo("12_2012-12-12T12:12:12");
     }
 
+    @Test
+    public void testIntervalTypesKey() {
+        List<LogicalTypeWithIndex> logicalTypesWithIndex =
+                Stream.of(
+                                new LogicalTypeWithIndex(
+                                        0, new 
YearMonthIntervalType(YEAR_TO_MONTH)),
+                                new LogicalTypeWithIndex(1, new 
DayTimeIntervalType(DAY_TO_SECOND)))
+                        .collect(Collectors.toList());
+
+        Function<RowData, String> keyExtractor =
+                KeyExtractor.createKeyExtractor(logicalTypesWithIndex, "_");
+
+        String key = keyExtractor.apply(GenericRowData.of(13, 3_600_000L));
+        assertThat(key).isEqualTo("P13M_PT1H");
+    }
+
     @Test
     public void testAllTypesKey() {
         List<LogicalTypeWithIndex> logicalTypesWithIndex =

Reply via email to