This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 14b3c483c72e17dd1bcad3508bcd243f8c41e327 Author: liuyongvs <liuyon...@gmail.com> AuthorDate: Tue Jun 9 16:40:16 2020 +0800 [FLINK-18208] Fix flink ES connector typos This closes #12543 --- .../elasticsearch/table/Elasticsearch6DynamicSink.java | 4 ++-- .../table/Elasticsearch7DynamicSinkFactory.java | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 680cb2c..01b5f47 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -50,7 +50,7 @@ import java.util.Objects; @PublicEvolving final class Elasticsearch6DynamicSink implements DynamicTableSink { @VisibleForTesting - static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory(); + static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory(); private final EncodingFormat<SerializationSchema<RowData>> format; private final TableSchema schema; @@ -200,7 +200,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { /** * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink. */ - private static class Elasticsearch7RequestFactory implements RequestFactory { + private static class Elasticsearch6RequestFactory implements RequestFactory { @Override public UpdateRequest createUpdateRequest( String index, diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java index ae7a9fd..320c894 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java @@ -94,7 +94,7 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory .forEach(configuration::setString); Elasticsearch7Configuration config = new Elasticsearch7Configuration(configuration, context.getClassLoader()); - validateOptions(config, configuration); + validate(config, configuration); return new Elasticsearch7DynamicSink( format, @@ -102,27 +102,27 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory TableSchemaUtils.getPhysicalSchema(tableSchema)); } - private void validateOptions(Elasticsearch7Configuration config, Configuration originalConfiguration) { + private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) { config.getFailureHandler(); // checks if we can instantiate the custom failure handler config.getHosts(); // validate hosts - validateOptions( + validate( config.getIndex().length() >= 1, () -> String.format("'%s' must not be empty", INDEX_OPTION.key())); - validateOptions( + validate( config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true), () -> String.format( "'%s' must be at least 1 character. Got: %s", BULK_FLUSH_MAX_ACTIONS_OPTION.key(), config.getBulkFlushMaxActions().get()) ); - validateOptions( + validate( config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true), () -> String.format( "'%s' must be at least 1mb character. Got: %s", BULK_FLASH_MAX_SIZE_OPTION.key(), originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString()) ); - validateOptions( + validate( config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true), () -> String.format( "'%s' must be at least 1. Got: %s", @@ -131,7 +131,7 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory ); } - private static void validateOptions(boolean condition, Supplier<String> message) { + private static void validate(boolean condition, Supplier<String> message) { if (!condition) { throw new ValidationException(message.get()); }