AHeise commented on a change in pull request #17538:
URL: https://github.com/apache/flink/pull/17538#discussion_r740484286



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Suite tests for {@link IndexGenerator}. */
+public class IndexGeneratorTest {
+
+    private List<RowData> rows;
+    private List<String> fieldNames;
+    private List<DataType> dataTypes;
+
+    @BeforeEach
+    public void prepareData() {

Review comment:
       Aren't all variables here constants? Can't we simply initialize them in 
the test?

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Suite tests for {@link IndexGenerator}. */
+public class IndexGeneratorTest {
+
+    private List<RowData> rows;
+    private List<String> fieldNames;
+    private List<DataType> dataTypes;
+
+    @BeforeEach
+    public void prepareData() {
+        fieldNames =
+                Stream.of(
+                                "id",
+                                "item",
+                                "log_ts",
+                                "log_date",
+                                "order_timestamp",
+                                "log_time",
+                                "local_datetime",
+                                "local_date",
+                                "local_time",
+                                "note",
+                                "status")
+                        .collect(Collectors.toList());

Review comment:
       ```suggestion
                   Arrays.asList(
                                   "id",
                                   "item",
                                   "log_ts",
                                   "log_date",
                                   "order_timestamp",
                                   "log_time",
                                   "local_datetime",
                                   "local_date",
                                   "local_time",
                                   "note",
                                   "status");
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
##########
@@ -23,31 +24,21 @@
 
 import org.apache.http.HttpHost;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Builder to construct a {@link ElasticsearchSink}.
- *
- * <p>The following example shows the minimal setup to create a 
ElasticsearchSink that submits
- * actions on checkpoint or the default number of actions was buffered (1000).
- *
- * <pre>{@code
- * Elasticsearch<String> sink = Elasticsearch
- *     .builder()
- *     .setHosts(MY_ELASTICSEARCH_HOSTS)
- *     .setEmitter(MY_ELASTICSEARCH_EMITTER)
- *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- *     .build();
- * }</pre>
+ * Base builder to construct a {@link ElasticsearchSink}.
  *
  * @param <IN> type of the records converted to Elasticsearch actions
  */
 @PublicEvolving
-public class ElasticsearchSinkBuilder<IN> {
+public abstract class ElasticsearchSinkBuilderBase<IN> implements Serializable 
{

Review comment:
       For base builders, you usually need to capture the type as well 
`ElasticsearchSinkBuilderBase<IN, B extends ElasticsearchSinkBuilderBase<IN>>` 
in case you want to add ES6 / 7 specific methods later.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
##########
@@ -80,15 +76,16 @@
      * Sets the emitter which is invoked on every record to convert it to 
Elasticsearch actions.
      *
      * @param emitter to process records into Elasticsearch actions.
-     * @return {@link ElasticsearchSinkBuilder}

Review comment:
       replace everything with `@return this builder`. You usually link only to 
external types. In this case, the return type is actually a specific builder 
and not just base; so it's inaccurate.

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Builder to construct an Elasticsearch 6 compatible {@link 
ElasticsearchSink}.
+ *
+ * <p>The following example shows the minimal setup to create a 
ElasticsearchSink that submits
+ * actions on checkpoint or the default number of actions was buffered (1000).
+ *
+ * <pre>{@code
+ * ElasticsearchSink<String> sink = new Elasticsearch6SinkBuilder<String>()
+ *     .setHosts(MY_ELASTICSEARCH_HOSTS)
+ *     .setEmitter(MY_ELASTICSEARCH_EMITTER)

Review comment:
       In general, I think we should have more specific examples for the main 
entry points: There should be an example emitter and we should give an example 
host (the full URL). FYI, @fapaul 

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchDynamicSink.java
##########
@@ -81,21 +106,18 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
 
         final RowElasticsearchEmitter rowElasticsearchEmitter =
                 new RowElasticsearchEmitter(
-                        IndexGeneratorFactory.createIndexGenerator(
-                                config.getIndex(),
-                                DataType.getFieldNames(physicalRowDataType),
-                                
DataType.getFieldDataTypes(physicalRowDataType)),
+                        createIndexGenerator(),
                         format,
                         XContentType.JSON,
-                        KeyExtractor.createKeyExtractor(
-                                primaryKeyLogicalTypesWithIndex, 
config.getKeyDelimiter()));
+                        documentType,
+                        createKeyExtractor());
 
-        final Elasticsearch7SinkBuilder<RowData> builder = new 
Elasticsearch7SinkBuilder<>();
+        final ElasticsearchSinkBuilderBase<RowData> builder = 
builderSupplier.get();
         builder.setEmitter(rowElasticsearchEmitter);
         builder.setHosts(config.getHosts().toArray(new HttpHost[0]));
         builder.setDeliveryGuarantee(config.getDeliveryGuarantee());
         builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
-        builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() 
>> 20));
+        builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize().getBytes() >> 20));

Review comment:
       `getMebiBytes`

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java
##########
@@ -153,19 +119,9 @@
                             "The format must produce a valid JSON document. "
                                     + "Please refer to the documentation on 
formats for more details.");
 
-    // 
--------------------------------------------------------------------------------------------
-    // Enums
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * Backoff strategy. Extends {@link 
ElasticsearchSinkBase.FlushBackoffType} with {@code
-     * DISABLED} option.
-     */
-    public enum BackOffType {
-        DISABLED,
-        CONSTANT,
-        EXPONENTIAL
-    }
-
-    private ElasticsearchConnectorOptions() {}
+    public static final ConfigOption<DeliveryGuarantee> 
DELIVERY_GUARANTEE_OPTION =
+            ConfigOptions.key("sink.delivery-guarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)

Review comment:
       This is different from the default to DataStream. I'd probably favor 
at-least-once anyways, but we should align here.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkRequestConsumerFactory.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * {@link BulkRequestConsumerFactory} is used to bridge incompatible 
Elasticsearch Java API calls
+ * across different Elasticsearch versions.
+ */
+@Internal
+interface BulkRequestConsumerFactory
+        extends Function<
+                        RestHighLevelClient, BiConsumer<BulkRequest, 
ActionListener<BulkResponse>>>,

Review comment:
       Do you gain anything by extending `Function`? I'd rather go with an 
explicit `create`.
   An alternative cut may be to actually add an explicit `BulkRequestConsumer` 
(or handler?) class and use `SerializableFunction<RestHighLevelClient,  
BulkRequestConsumer>` as the factory.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/RequestIndexer.java
##########
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0

Review comment:
       Can you create a hotfix with all the license header changes (and 
double-check that yours is correct).

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchDynamicSink.java
##########
@@ -45,22 +48,44 @@
  * logical description.
  */
 @Internal
-final class Elasticsearch7DynamicSink implements DynamicTableSink {
+class ElasticsearchDynamicSink implements DynamicTableSink {

Review comment:
       Does this mean we get rid of version specific ESDynamicSink in table? :D

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorTest.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;

Review comment:
       Yes, please.

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Builder to construct an Elasticsearch 6 compatible {@link 
ElasticsearchSink}.
+ *
+ * <p>The following example shows the minimal setup to create a 
ElasticsearchSink that submits
+ * actions on checkpoint or the default number of actions was buffered (1000).
+ *
+ * <pre>{@code
+ * ElasticsearchSink<String> sink = new Elasticsearch6SinkBuilder<String>()

Review comment:
       Let's change that to `ElasticsearchSink<String> sink = new 
Elasticsearch6Sink.builder()` to make that consistent with Kafka/Pulsar. 
Probably deserves a separate commit. Builder ctor should not be public.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java
##########
@@ -165,8 +161,8 @@ private void runTest(
             @Nullable MapFunction<Long, Long> additionalMapper)
             throws Exception {
         final ElasticsearchSink<Tuple2<Integer, String>> sink =
-                ElasticsearchSink.builder()
-                        
.setHosts(HttpHost.create(ES_CONTAINER.getHttpHostAddress()))
+                getSinkBuilder()
+                        
.setHosts(HttpHost.create(getElasticsearchHttpHostAddress()))

Review comment:
       Can we move setting the host to `getSinkBuilder`? Then we have only one 
abstract method.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
##########
@@ -23,31 +24,21 @@
 
 import org.apache.http.HttpHost;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Builder to construct a {@link ElasticsearchSink}.
- *
- * <p>The following example shows the minimal setup to create a 
ElasticsearchSink that submits
- * actions on checkpoint or the default number of actions was buffered (1000).
- *
- * <pre>{@code
- * Elasticsearch<String> sink = Elasticsearch
- *     .builder()
- *     .setHosts(MY_ELASTICSEARCH_HOSTS)
- *     .setEmitter(MY_ELASTICSEARCH_EMITTER)
- *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- *     .build();
- * }</pre>
+ * Base builder to construct a {@link ElasticsearchSink}.
  *
  * @param <IN> type of the records converted to Elasticsearch actions
  */
 @PublicEvolving
-public class ElasticsearchSinkBuilder<IN> {
+public abstract class ElasticsearchSinkBuilderBase<IN> implements Serializable 
{

Review comment:
       Why is the builder serializable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to