fapaul commented on a change in pull request #17374:
URL: https://github.com/apache/flink/pull/17374#discussion_r724161436



##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -20,20 +20,91 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.http.HttpHost;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Elasticsearch 7 specific configuration. */
 @Internal
-final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+final class Elasticsearch7Configuration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;
+
     Elasticsearch7Configuration(ReadableConfig config, ClassLoader 
classLoader) {
-        super(config, classLoader);
+        this.config = checkNotNull(config);
+        this.classLoader = checkNotNull(classLoader);
+    }
+
+    public int getBulkFlushMaxActions() {
+        int maxActions = 
config.get(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxActions == 0 ? -1 : maxActions;
+    }
+
+    public long getBulkFlushMaxByteSize() {
+        long maxSize =
+                
config.get(ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return maxSize == 0 ? -1 : maxSize;
+    }
+
+    public long getBulkFlushInterval() {
+        long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+        // convert 0 to -1, because Elasticsearch client use -1 to disable 
this configuration.
+        return interval == 0 ? -1 : interval;

Review comment:
       Why do we need these checks?

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
##########
@@ -109,6 +110,33 @@ public int getIndex() {
                 .orElseGet(() -> (Function<RowData, String> & Serializable) 
(row) -> null);
     }
 
+    static class LogicalTypeWithIndex {

Review comment:
       Please put this class into its own class.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -54,54 +44,23 @@
  */
 @Internal
 final class Elasticsearch7DynamicSink implements DynamicTableSink {
-    @VisibleForTesting
-    static final Elasticsearch7RequestFactory REQUEST_FACTORY =
-            new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
 
     private final EncodingFormat<SerializationSchema<RowData>> format;
-    private final TableSchema schema;
+    private final DataType physicalRowDataType;
+    private final List<KeyExtractor.LogicalTypeWithIndex> 
primaryKeyLogicalTypesWithIndex;
     private final Elasticsearch7Configuration config;
 
-    public Elasticsearch7DynamicSink(
-            EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new));
-    }
-
-    // --------------------------------------------------------------
-    // Hack to make configuration testing possible.
-    //
-    // The code in this block should never be used outside of tests.
-    // Having a way to inject a builder we can assert the builder in
-    // the test. We can not assert everything though, e.g. it is not
-    // possible to assert flushing on checkpoint, as it is configured
-    // on the sink itself.
-    // --------------------------------------------------------------
-
-    private final ElasticSearchBuilderProvider builderProvider;
-
-    @FunctionalInterface
-    interface ElasticSearchBuilderProvider {
-        ElasticsearchSink.Builder<RowData> createBuilder(
-                List<HttpHost> httpHosts, RowElasticsearchSinkFunction 
upsertSinkFunction);
-    }
-
     Elasticsearch7DynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
-            Elasticsearch7Configuration config,
-            TableSchema schema,
-            ElasticSearchBuilderProvider builderProvider) {
+            DataType physicalRowDataType,
+            List<KeyExtractor.LogicalTypeWithIndex> 
primaryKeyLogicalTypesWithIndex,
+            Elasticsearch7Configuration config) {
         this.format = format;
-        this.schema = schema;
+        this.physicalRowDataType = physicalRowDataType;
+        this.primaryKeyLogicalTypesWithIndex = primaryKeyLogicalTypesWithIndex;
         this.config = config;

Review comment:
       please add `checkNotNull` statements

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
##########
@@ -63,25 +64,44 @@
     private static final Set<ConfigOption<?>> optionalOptions =
             Stream.of(
                             KEY_DELIMITER_OPTION,
-                            FAILURE_HANDLER_OPTION,
-                            FLUSH_ON_CHECKPOINT_OPTION,
                             BULK_FLASH_MAX_SIZE_OPTION,
                             BULK_FLUSH_MAX_ACTIONS_OPTION,
                             BULK_FLUSH_INTERVAL_OPTION,
                             BULK_FLUSH_BACKOFF_TYPE_OPTION,
                             BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
                             BULK_FLUSH_BACKOFF_DELAY_OPTION,
-                            CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
-                            CONNECTION_PATH_PREFIX,
+                            CONNECTION_PATH_PREFIX_OPTION,
                             FORMAT_OPTION,
+                            DELIVERY_GUARANTEE_OPTION,
                             PASSWORD_OPTION,
                             USERNAME_OPTION)
                     .collect(Collectors.toSet());
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
-        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+        DataType physicalRowDataType = context.getPhysicalRowDataType();
+        int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
+        if (primaryKeyIndexes.length != 0) {
+            DataType pkDataType = DataType.projectFields(physicalRowDataType, 
primaryKeyIndexes);
+
+            ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
+        }
+
+        List<KeyExtractor.LogicalTypeWithIndex> 
primaryKeyLogicalTypesWithIndex =
+                Arrays.stream(primaryKeyIndexes)
+                        .mapToObj(
+                                index -> {
+                                    
@SuppressWarnings("OptionalGetWithoutIsPresent")
+                                    Column column =
+                                            context.getCatalogTable()
+                                                    .getResolvedSchema()

Review comment:
       Nit: extract a variable before the loop.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
##########
@@ -63,25 +64,44 @@
     private static final Set<ConfigOption<?>> optionalOptions =
             Stream.of(
                             KEY_DELIMITER_OPTION,
-                            FAILURE_HANDLER_OPTION,
-                            FLUSH_ON_CHECKPOINT_OPTION,
                             BULK_FLASH_MAX_SIZE_OPTION,
                             BULK_FLUSH_MAX_ACTIONS_OPTION,
                             BULK_FLUSH_INTERVAL_OPTION,
                             BULK_FLUSH_BACKOFF_TYPE_OPTION,
                             BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
                             BULK_FLUSH_BACKOFF_DELAY_OPTION,
-                            CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
-                            CONNECTION_PATH_PREFIX,
+                            CONNECTION_PATH_PREFIX_OPTION,
                             FORMAT_OPTION,
+                            DELIVERY_GUARANTEE_OPTION,
                             PASSWORD_OPTION,
                             USERNAME_OPTION)
                     .collect(Collectors.toSet());
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
-        ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+        DataType physicalRowDataType = context.getPhysicalRowDataType();
+        int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
+        if (primaryKeyIndexes.length != 0) {
+            DataType pkDataType = DataType.projectFields(physicalRowDataType, 
primaryKeyIndexes);
+
+            ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
+        }
+
+        List<KeyExtractor.LogicalTypeWithIndex> 
primaryKeyLogicalTypesWithIndex =
+                Arrays.stream(primaryKeyIndexes)
+                        .mapToObj(
+                                index -> {
+                                    
@SuppressWarnings("OptionalGetWithoutIsPresent")

Review comment:
       Although the column should always be present IMO we can handle the case 
explicitly to give a better error message and to get rid of the suppression.
   
   My previous comment was only semi-complete when using the annotation we 
always put it above the method and not above a certain call.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7ConnectorOptions.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
+
+import java.time.Duration;
+import java.util.List;
+
+/** Options for the Elasticsearch connector. */
+class Elasticsearch7ConnectorOptions {

Review comment:
       I think we always mark `ConnectorOptions` as `@PublicEvolving`. @twalthr 
is there some deep reason behind or just to signal users that these options 
will remain stable.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
##########
@@ -20,20 +20,91 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.http.HttpHost;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7ConnectorOptions.USERNAME_OPTION;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Elasticsearch 7 specific configuration. */
 @Internal
-final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+final class Elasticsearch7Configuration {
+    protected final ReadableConfig config;
+    private final ClassLoader classLoader;

Review comment:
       Is the classloader unused?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
##########
@@ -114,188 +73,65 @@ public ChangelogMode getChangelogMode(ChangelogMode 
requestedMode) {
     }
 
     @Override
-    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
-        return () -> {
-            SerializationSchema<RowData> format =
-                    this.format.createRuntimeEncoder(context, 
schema.toRowDataType());
-
-            final RowElasticsearchSinkFunction upsertFunction =
-                    new RowElasticsearchSinkFunction(
-                            
IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
-                            null, // this is deprecated in es 7+
-                            format,
-                            XContentType.JSON,
-                            REQUEST_FACTORY,
-                            KeyExtractor.createKeyExtractor(schema, 
config.getKeyDelimiter()));
-
-            final ElasticsearchSink.Builder<RowData> builder =
-                    builderProvider.createBuilder(config.getHosts(), 
upsertFunction);
-
-            builder.setFailureHandler(config.getFailureHandler());
-            builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
-            builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));
-            builder.setBulkFlushInterval(config.getBulkFlushInterval());
-            builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
-            
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
-            
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
-            
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
-
-            // we must overwrite the default factory which is defined with a 
lambda because of a bug
-            // in shading lambda serialization shading see FLINK-18006
-            if (config.getUsername().isPresent()
-                    && config.getPassword().isPresent()
-                    && 
!StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
-                    && 
!StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
-                builder.setRestClientFactory(
-                        new AuthRestClientFactory(
-                                config.getPathPrefix().orElse(null),
-                                config.getUsername().get(),
-                                config.getPassword().get()));
-            } else {
-                builder.setRestClientFactory(
-                        new 
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
-            }
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        SerializationSchema<RowData> format =
+                this.format.createRuntimeEncoder(context, physicalRowDataType);
+
+        final RowElasticsearchEmitter rowElasticsearchEmitter =
+                new RowElasticsearchEmitter(
+                        IndexGeneratorFactory.createIndexGenerator(
+                                config.getIndex(),
+                                DataType.getFieldNames(physicalRowDataType),
+                                
DataType.getFieldDataTypes(physicalRowDataType)),
+                        format,
+                        XContentType.JSON,
+                        KeyExtractor.createKeyExtractor(
+                                primaryKeyLogicalTypesWithIndex, 
config.getKeyDelimiter()));
+
+        final ElasticsearchSinkBuilder<RowData> builder = 
ElasticsearchSink.builder();
+        builder.setEmitter(rowElasticsearchEmitter);
+        builder.setHosts(config.getHosts().toArray(new HttpHost[0]));
+        builder.setDeliveryGuarantee(config.getDeliveryGuarantee());
+        builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+        builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() 
>> 20));
+        builder.setBulkFlushInterval(config.getBulkFlushInterval());
+
+        if (config.getBulkFlushBackoffType().isPresent()) {
+            FlushBackoffType backoffType = 
config.getBulkFlushBackoffType().get();
+            int backoffMaxRetries = config.getBulkFlushBackoffRetries().get();
+            long backoffDelayMs = config.getBulkFlushBackoffDelay().get();
+
+            builder.setBulkFlushBackoffStrategy(backoffType, 
backoffMaxRetries, backoffDelayMs);
+        }
 
-            final ElasticsearchSink<RowData> sink = builder.build();
+        if (config.getUsername().isPresent()
+                && config.getPassword().isPresent()
+                && 
!StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+                && 
!StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+            builder.setConnectionPassword(config.getPassword().get());
+            builder.setConnectionUsername(config.getUsername().get());
+        }
 
-            if (config.isDisableFlushOnCheckpoint()) {
-                sink.disableFlushOnCheckpoint();
-            }
+        if (config.getPathPrefix().isPresent()
+                && 
!StringUtils.isNullOrWhitespaceOnly(config.getPathPrefix().get())) {
+            builder.setConnectionPathPrefix(config.getPathPrefix().get());
+        }
 
-            return sink;
-        };
+        final ElasticsearchSink<RowData> sink = builder.build();

Review comment:
       Nit: inline variable

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
##########
@@ -109,6 +110,33 @@ public int getIndex() {
                 .orElseGet(() -> (Function<RowData, String> & Serializable) 
(row) -> null);
     }
 
+    static class LogicalTypeWithIndex {

Review comment:
       I am in general, not a big fan of exposing nested classes. Can you put 
this class in a separate file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to