This is an automated email from the ASF dual-hosted git repository. reswqa pushed a commit to branch v4.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
commit ab38546363770c37ce3cb784fd489aeba0e34a24 Author: garyjgliu <[email protected]> AuthorDate: Wed Sep 17 17:49:51 2025 +0800 [FLINK-38373][Connectors / ElasticSearch] ES index supports fields suffix (cherry picked from commit c7f813c85b0d29b4ac2270958b9f4debc39530ff) --- .../table/ElasticsearchConfiguration.java | 8 +++ .../table/ElasticsearchConnectorOptions.java | 12 ++++ .../table/ElasticsearchDynamicSink.java | 2 +- .../ElasticsearchDynamicTableFactoryBase.java | 6 +- .../elasticsearch/table/IndexGeneratorFactory.java | 50 ++++++++++++++ .../elasticsearch/table/IndexGeneratorTest.java | 79 +++++++++++++++++++++- 6 files changed, 153 insertions(+), 4 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index 6c74200..35db69f 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -135,6 +135,14 @@ public class ElasticsearchConfiguration { return config.get(RETRY_ON_CONFLICTS); } + public String getIndexSuffixFieldName() { + return config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION); + } + + public int getIndexSuffixFieldLength() { + return config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION); + } + /** * Parse Hosts String to list. * diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index f34cb87..7817a0d 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -152,4 +152,16 @@ public class ElasticsearchConnectorOptions { .defaultValue(-1) .withDescription( "The number of retry when conflicts with concurrent requests."); + + public static final ConfigOption<String> INDEX_SUFFIX_FIELD_NAME_OPTION = + ConfigOptions.key("index.suffix.field.name") + .stringType() + .noDefaultValue() + .withDescription("The index suffix field name"); + + public static final ConfigOption<Integer> INDEX_SUFFIX_FIELD_LENGTH_OPTION = + ConfigOptions.key("index.suffix.field.length") + .intType() + .defaultValue(-1) + .withDescription("The length(exclusive) of index suffix field value"); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index d6d04a6..a522613 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -98,7 +98,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink { IndexGenerator createIndexGenerator() { return IndexGeneratorFactory.createIndexGenerator( - config.getIndex(), + config, DataType.getFieldNames(physicalRowDataType), DataType.getFieldDataTypes(physicalRowDataType), localTimeZoneId); diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java index 2686e2e..f223380 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java @@ -70,6 +70,8 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; @@ -287,7 +289,9 @@ abstract class ElasticsearchDynamicTableFactoryBase PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + INDEX_SUFFIX_FIELD_LENGTH_OPTION, + INDEX_SUFFIX_FIELD_NAME_OPTION) .collect(Collectors.toSet()); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java index 92886f4..b628b10 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java @@ -86,6 +86,56 @@ final class IndexGeneratorFactory { return createIndexGenerator(index, fieldNames, dataTypes, ZoneId.systemDefault()); } + public static IndexGenerator createIndexGenerator( + ElasticsearchConfiguration config, + List<String> fieldNames, + List<DataType> dataTypes, + ZoneId localTimeZoneId) { + if (config.getIndexSuffixFieldName() != null) { + return createSuffixIndexGenerator( + config.getIndex(), + config.getIndexSuffixFieldName(), + config.getIndexSuffixFieldLength(), + fieldNames, + dataTypes); + } else { + return createIndexGenerator(config.getIndex(), fieldNames, dataTypes, localTimeZoneId); + } + } + + private static IndexGenerator createSuffixIndexGenerator( + String indexPrefix, + String indexSuffixFieldName, + int indexSuffixFieldLength, + List<String> fieldNames, + List<DataType> fieldTypes) { + int indexFieldPos = fieldNames.indexOf(indexSuffixFieldName); + if (indexFieldPos < 0) { + throw new TableException( + String.format( + "Unknown index field '%s' of '%s', please check the field name.", + indexSuffixFieldName, String.join(",", fieldNames))); + } + final LogicalType indexFieldType = fieldTypes.get(indexFieldPos).getLogicalType(); + final RowData.FieldGetter fieldGetter = + RowData.createFieldGetter(indexFieldType, indexFieldPos); + return row -> { + Object fieldOrNull = fieldGetter.getFieldOrNull(row); + final String indexSuffix; + if (fieldOrNull != null) { + if (indexSuffixFieldLength > 0) { + indexSuffix = String.valueOf(fieldOrNull).substring(0, indexSuffixFieldLength); + } else { + indexSuffix = String.valueOf(fieldOrNull); + } + } else { + throw new RuntimeException( + "Index suffix field " + indexSuffixFieldName + " is null"); + } + return String.format("%s%s", indexPrefix, indexSuffix); + }; + } + interface DynamicFormatter extends Serializable { String format(@Nonnull Object fieldValue, DateTimeFormatter formatter); } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java index 44062a3..acbd4b6 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.elasticsearch.table; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.GenericRowData; @@ -39,6 +40,9 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.junit.jupiter.api.Assumptions.assumingThat; @@ -81,7 +85,10 @@ public class IndexGeneratorTest { GenericRowData.of( 1, StringData.fromString("apple"), - Timestamp.valueOf("2020-03-18 12:12:14").getTime(), + LocalDateTime.of(2020, 3, 18, 12, 12, 14) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant() + .toEpochMilli(), (int) Date.valueOf("2020-03-18").toLocalDate().toEpochDay(), TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")), (int) @@ -100,7 +107,10 @@ public class IndexGeneratorTest { GenericRowData.of( 2, StringData.fromString("peanut"), - Timestamp.valueOf("2020-03-19 12:22:14").getTime(), + LocalDateTime.of(2020, 3, 19, 12, 22, 14) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant() + .toEpochMilli(), (int) Date.valueOf("2020-03-19").toLocalDate().toEpochDay(), TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")), (int) @@ -367,4 +377,69 @@ public class IndexGeneratorTest { .isInstanceOf(IllegalArgumentException.class) .hasMessage(expectedExceptionMsg); } + + @Test + public void testSuffixIndexGenerator() { + Configuration config = new Configuration(); + config.set(INDEX_OPTION, "index_"); + config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts"); + config.set(INDEX_SUFFIX_FIELD_LENGTH_OPTION, 10); + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + new ElasticsearchConfiguration(config), + fieldNames, + dataTypes, + ZoneId.systemDefault()); + // 1584504734000 + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734"); + // 1584591734000 + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734"); + } + + @Test + public void testSuffixIndexGeneratorWithoutLengthLimitation() { + Configuration config = new Configuration(); + config.set(INDEX_OPTION, "index_"); + config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts"); + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + new ElasticsearchConfiguration(config), + fieldNames, + dataTypes, + ZoneId.systemDefault()); + // 1584504734000 + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734000"); + // 1584591734000 + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734000"); + } + + @Test + public void testStaticIndexCompatibility() { + Configuration config = new Configuration(); + config.set(INDEX_OPTION, "my-index"); + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + new ElasticsearchConfiguration(config), + fieldNames, + dataTypes, + ZoneId.systemDefault()); + indexGenerator.open(); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index"); + } + + @Test + public void testDynamicIndexFromDateCompatibility() { + Configuration config = new Configuration(); + config.set(INDEX_OPTION, "my-index-{log_date|yyyy/MM/dd}"); + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + new ElasticsearchConfiguration(config), + fieldNames, + dataTypes, + ZoneId.systemDefault()); + indexGenerator.open(); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19"); + } }
