afedulov commented on a change in pull request #18153:
URL: https://github.com/apache/flink/pull/18153#discussion_r798750480



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/ElasticsearchUtil.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.common;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClientBuilder;
+
+/** Collection of utility methods for the Elasticsearch source and sink. */
+@Internal
+public class ElasticsearchUtil {
+
+    public static RestClientBuilder configureRestClientBuilder(
+            RestClientBuilder builder, NetworkClientConfig config) {
+        if (config.getConnectionPathPrefix() != null) {

Review comment:
       Nit: config check not null 

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/ElasticsearchUtil.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.common;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClientBuilder;
+
+/** Collection of utility methods for the Elasticsearch source and sink. */
+@Internal
+public class ElasticsearchUtil {
+
+    public static RestClientBuilder configureRestClientBuilder(
+            RestClientBuilder builder, NetworkClientConfig config) {
+        if (config.getConnectionPathPrefix() != null) {
+            builder.setPathPrefix(config.getConnectionPathPrefix());

Review comment:
       Nit: builder check not null

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>

Review comment:
       ```suggestion
    * ElasticsearchSource emitting records of {@code
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>
+ * String</code> type.

Review comment:
       ```suggestion
    * String} type.
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumStateSerializer.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer Serializer} 
for the enumerator
+ * state of Elasticsearch source.

Review comment:
       ```suggestion
    * state of the Elasticsearch source.
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/NetworkClientConfig.java
##########
@@ -16,13 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.elasticsearch.sink;
+package org.apache.flink.connector.elasticsearch.common;
+
+import org.apache.flink.annotation.Internal;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
-class NetworkClientConfig implements Serializable {
+/** This class encapsulates information on how to connect against an 
Elasticsearch cluster. */

Review comment:
       ```suggestion
   /** This class encapsulates information on how to connect to an 
Elasticsearch cluster. */
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.source.Elasticsearch7Source;
+import 
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceBuilder;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An Elasticsearch {@link ScanTableSource}. */
+@Internal
+public class Elasticsearch7DynamicSource implements ScanTableSource {

Review comment:
       `Dynamic` without the reference to `Table` is somewhat confusing in the 
naming. I believe it should be consistent with the existing usage, i.e. 
`HBaseDynamicTableSource` , `JdbcDynamicTableSource`. The same applies to the 
related configuration classes etc.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataElasticsearch7SearchHitDeserializationSchema.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonToRowDataConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.Collector;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.elasticsearch.search.SearchHit;
+
+import java.io.IOException;
+
+/** A {@link Elasticsearch7SearchHitDeserializationSchema} that deserializes 
to {@link RowData}. */
+public class RowDataElasticsearch7SearchHitDeserializationSchema
+        implements Elasticsearch7SearchHitDeserializationSchema<RowData> {
+
+    /** TypeInformation of the produced {@link RowData}. */
+    private final TypeInformation<RowData> producedTypeInformation;
+
+    /** Flag indicating whether to fail if a field is missing. */
+    private final boolean failOnMissingField;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
+    private final boolean ignoreParseErrors;
+
+    /** Timestamp format specification which is used to parse timestamp. */
+    private final TimestampFormat timestampFormat;
+
+    /**
+     * Runtime converter that converts {@link JsonNode}s into objects of Flink 
SQL internal data
+     * structures.
+     */
+    private final JsonToRowDataConverters.JsonToRowDataConverter 
runtimeConverter;
+
+    /** Object mapper for parsing the JSON. */
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    RowDataElasticsearch7SearchHitDeserializationSchema(
+            RowType rowType,
+            TypeInformation<RowData> producedTypeInformation,
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        this.producedTypeInformation = producedTypeInformation;
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+
+        if (ignoreParseErrors && failOnMissingField) {
+            throw new IllegalArgumentException(
+                    "JSON format doesn't support failOnMissingField and 
ignoreParseErrors are both enabled.");
+        }
+
+        this.runtimeConverter =
+                new JsonToRowDataConverters(failOnMissingField, 
ignoreParseErrors, timestampFormat)
+                        .createConverter(rowType);
+
+        boolean hasDecimalType =
+                LogicalTypeChecks.hasNested(rowType, t -> t instanceof 
DecimalType);
+        if (hasDecimalType) {
+            
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);

Review comment:
       Just curious, what is the rational not not enabling this by default? 
Memory usage?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire *
+ * index has been read.
+ *
+ * <p>See {@link Elasticsearch7SourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+@PublicEvolving
+public class Elasticsearch7Source<OUT>
+        implements Source<OUT, Elasticsearch7Split, 
Elasticsearch7SourceEnumState>,
+                ResultTypeQueryable<OUT> {
+
+    private final Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+    private final Elasticsearch7SourceConfiguration sourceConfiguration;
+    private final NetworkClientConfig networkClientConfig;
+
+    Elasticsearch7Source(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema,
+            Elasticsearch7SourceConfiguration sourceConfiguration,
+            NetworkClientConfig networkClientConfig) {
+        this.deserializationSchema = deserializationSchema;
+        this.sourceConfiguration = sourceConfiguration;
+        this.networkClientConfig = networkClientConfig;
+    }
+
+    /**
+     * Create an {@link Elasticsearch7SourceBuilder} to construct a new {@link

Review comment:
       ```suggestion
        * Creates an {@link Elasticsearch7SourceBuilder} for constructing a new 
{@link
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire *
+ * index has been read.
+ *
+ * <p>See {@link Elasticsearch7SourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+@PublicEvolving
+public class Elasticsearch7Source<OUT>
+        implements Source<OUT, Elasticsearch7Split, 
Elasticsearch7SourceEnumState>,
+                ResultTypeQueryable<OUT> {
+
+    private final Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+    private final Elasticsearch7SourceConfiguration sourceConfiguration;
+    private final NetworkClientConfig networkClientConfig;
+
+    Elasticsearch7Source(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema,

Review comment:
       Nit: null checks for parameters

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire *

Review comment:
       ```suggestion
    * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a
+ * ElasticsearchSource emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()

Review comment:
       A couple of things seem to be off:
   
   -  `new` is not needed (builder() is a static method)
   - `setHosts` line is missing a parenthesis
   Maybe check that the example compiles outside of the docs.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer;
+import 
org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/**
+ * The Source implementation for Elasticsearch. Please use a {@link 
Elasticsearch7SourceBuilder} to
+ * construct a {@link Elasticsearch7Source}. The following example shows how 
to create a

Review comment:
       ```suggestion
    * construct an {@link Elasticsearch7Source}. The following example shows 
how to create an
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()

Review comment:
       Same as the above:
   - `new` is not needed (builder() is a static method)
   - `setHosts` line is missing a parenthesis
   - check that the example compiles outside of the docs

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
+ * index has been read.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * ElasticsearchSource.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class);
+
+    private Duration pitKeepAlive = Duration.ofMinutes(5);
+    private int numberOfSearchSlices = 1;
+    private String indexName;
+    private Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+
+    private List<HttpHost> hosts;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer socketTimeout;
+    private Integer connectionRequestTimeout;
+
+    Elasticsearch7SourceBuilder() {}
+
+    private boolean isGreaterOrEqual(Duration d1, Duration d2) {
+        return ((d1.compareTo(d2) >= 0));
+    }
+
+    /**
+     * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is 
required to a snapshot
+     * of the data in the index, to avoid reading the same data on subsequent 
search calls. If the
+     * PIT has expired the source will not be able to read data from this 
snapshot again. This also
+     * means that recovering from a checkpoint whose PIT has expired is not 
possible.
+     *
+     * @param pitKeepAlive duration of the PIT keep alive
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration 
pitKeepAlive) {
+        checkNotNull(pitKeepAlive);
+        checkArgument(
+                isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)),
+                "PIT keep alive should be at least 5 minutes.");
+        this.pitKeepAlive = pitKeepAlive;
+        return this;
+    }
+
+    /**
+     * Sets the number of search slices to be used to read the index. The 
number of search slices
+     * should be a multiple of the number of shards in your Elasticsearch 
cluster.
+     *
+     * @param numberOfSearchSlices the number of search slices
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int 
numberOfSearchSlices) {
+        checkArgument(numberOfSearchSlices > 0, "Number of search slices must 
be greater than 0.");
+        this.numberOfSearchSlices = numberOfSearchSlices;
+        return this;
+    }
+
+    /**
+     * Sets the name of the Elasticsearch index to be read.
+     *
+     * @param indexName name of the Elasticsearch index
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) {
+        checkNotNull(indexName);
+        this.indexName = indexName;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the 
Elasticsearch source.
+     * The given schema will be used to deserialize {@link 
org.elasticsearch.search.SearchHit}s into
+     * the output type.
+     *
+     * @param deserializationSchema the deserialization schema to use
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema) {
+        checkNotNull(deserializationSchema);
+        this.deserializationSchema = deserializationSchema;
+        return this;
+    }
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the username used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param username of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String 
username) {
+        checkNotNull(username);
+        this.username = username;
+        return this;
+    }
+
+    /**
+     * Sets the password used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param password of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String 
password) {
+        checkNotNull(password);
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Sets a prefix which used for every REST communication to the 
Elasticsearch cluster.
+     *
+     * @param prefix for the communication
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String 
prefix) {
+        checkNotNull(prefix);
+        this.connectionPathPrefix = prefix;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for requesting the connection of the Elasticsearch 
cluster from the
+     * connection manager.
+     *
+     * @param connectionRequestTimeout tiemout for the connection request
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionRequestTimeout(
+            int connectionRequestTimeout) {
+        checkState(
+                connectionRequestTimeout >= 0,
+                "Connection request timeout must be larger than or equal to 
0.");
+        this.connectionRequestTimeout = connectionRequestTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for establishing a connection of the Elasticsearch 
cluster.

Review comment:
       ```suggestion
        * Sets the timeout for establishing a connection to the Elasticsearch 
cluster.
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
+ * index has been read.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * ElasticsearchSource.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class);
+
+    private Duration pitKeepAlive = Duration.ofMinutes(5);
+    private int numberOfSearchSlices = 1;
+    private String indexName;
+    private Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+
+    private List<HttpHost> hosts;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer socketTimeout;
+    private Integer connectionRequestTimeout;
+
+    Elasticsearch7SourceBuilder() {}
+
+    private boolean isGreaterOrEqual(Duration d1, Duration d2) {
+        return ((d1.compareTo(d2) >= 0));
+    }
+
+    /**
+     * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is 
required to a snapshot
+     * of the data in the index, to avoid reading the same data on subsequent 
search calls. If the
+     * PIT has expired the source will not be able to read data from this 
snapshot again. This also
+     * means that recovering from a checkpoint whose PIT has expired is not 
possible.
+     *
+     * @param pitKeepAlive duration of the PIT keep alive
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration 
pitKeepAlive) {
+        checkNotNull(pitKeepAlive);
+        checkArgument(
+                isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)),
+                "PIT keep alive should be at least 5 minutes.");
+        this.pitKeepAlive = pitKeepAlive;
+        return this;
+    }
+
+    /**
+     * Sets the number of search slices to be used to read the index. The 
number of search slices
+     * should be a multiple of the number of shards in your Elasticsearch 
cluster.
+     *
+     * @param numberOfSearchSlices the number of search slices
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int 
numberOfSearchSlices) {
+        checkArgument(numberOfSearchSlices > 0, "Number of search slices must 
be greater than 0.");
+        this.numberOfSearchSlices = numberOfSearchSlices;
+        return this;
+    }
+
+    /**
+     * Sets the name of the Elasticsearch index to be read.
+     *
+     * @param indexName name of the Elasticsearch index
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) {
+        checkNotNull(indexName);
+        this.indexName = indexName;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the 
Elasticsearch source.
+     * The given schema will be used to deserialize {@link 
org.elasticsearch.search.SearchHit}s into
+     * the output type.
+     *
+     * @param deserializationSchema the deserialization schema to use
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema) {
+        checkNotNull(deserializationSchema);
+        this.deserializationSchema = deserializationSchema;
+        return this;
+    }
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);

Review comment:
       Is the case that the hosts elements in the array are themselves `nulls` 
handled elsewhere?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
+ * index has been read.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * ElasticsearchSource.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class);
+
+    private Duration pitKeepAlive = Duration.ofMinutes(5);
+    private int numberOfSearchSlices = 1;
+    private String indexName;
+    private Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+
+    private List<HttpHost> hosts;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer socketTimeout;
+    private Integer connectionRequestTimeout;
+
+    Elasticsearch7SourceBuilder() {}
+
+    private boolean isGreaterOrEqual(Duration d1, Duration d2) {
+        return ((d1.compareTo(d2) >= 0));
+    }
+
+    /**
+     * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is 
required to a snapshot
+     * of the data in the index, to avoid reading the same data on subsequent 
search calls. If the
+     * PIT has expired the source will not be able to read data from this 
snapshot again. This also
+     * means that recovering from a checkpoint whose PIT has expired is not 
possible.
+     *
+     * @param pitKeepAlive duration of the PIT keep alive
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration 
pitKeepAlive) {
+        checkNotNull(pitKeepAlive);
+        checkArgument(
+                isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)),
+                "PIT keep alive should be at least 5 minutes.");
+        this.pitKeepAlive = pitKeepAlive;
+        return this;
+    }
+
+    /**
+     * Sets the number of search slices to be used to read the index. The 
number of search slices
+     * should be a multiple of the number of shards in your Elasticsearch 
cluster.
+     *
+     * @param numberOfSearchSlices the number of search slices
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int 
numberOfSearchSlices) {
+        checkArgument(numberOfSearchSlices > 0, "Number of search slices must 
be greater than 0.");
+        this.numberOfSearchSlices = numberOfSearchSlices;
+        return this;
+    }
+
+    /**
+     * Sets the name of the Elasticsearch index to be read.
+     *
+     * @param indexName name of the Elasticsearch index
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) {
+        checkNotNull(indexName);
+        this.indexName = indexName;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the 
Elasticsearch source.
+     * The given schema will be used to deserialize {@link 
org.elasticsearch.search.SearchHit}s into
+     * the output type.
+     *
+     * @param deserializationSchema the deserialization schema to use
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema) {
+        checkNotNull(deserializationSchema);
+        this.deserializationSchema = deserializationSchema;
+        return this;
+    }
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the username used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param username of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String 
username) {
+        checkNotNull(username);
+        this.username = username;
+        return this;
+    }
+
+    /**
+     * Sets the password used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param password of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String 
password) {
+        checkNotNull(password);
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Sets a prefix which used for every REST communication to the 
Elasticsearch cluster.
+     *
+     * @param prefix for the communication
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String 
prefix) {
+        checkNotNull(prefix);
+        this.connectionPathPrefix = prefix;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for requesting the connection of the Elasticsearch 
cluster from the
+     * connection manager.
+     *
+     * @param connectionRequestTimeout tiemout for the connection request

Review comment:
       ```suggestion
        * @param connectionRequestTimeout timeout for the connection request
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
+ * index has been read.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * ElasticsearchSource.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class);
+
+    private Duration pitKeepAlive = Duration.ofMinutes(5);
+    private int numberOfSearchSlices = 1;
+    private String indexName;
+    private Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+
+    private List<HttpHost> hosts;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer socketTimeout;
+    private Integer connectionRequestTimeout;
+
+    Elasticsearch7SourceBuilder() {}
+
+    private boolean isGreaterOrEqual(Duration d1, Duration d2) {
+        return ((d1.compareTo(d2) >= 0));
+    }
+
+    /**
+     * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is 
required to a snapshot
+     * of the data in the index, to avoid reading the same data on subsequent 
search calls. If the
+     * PIT has expired the source will not be able to read data from this 
snapshot again. This also
+     * means that recovering from a checkpoint whose PIT has expired is not 
possible.
+     *
+     * @param pitKeepAlive duration of the PIT keep alive
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration 
pitKeepAlive) {
+        checkNotNull(pitKeepAlive);
+        checkArgument(
+                isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)),
+                "PIT keep alive should be at least 5 minutes.");
+        this.pitKeepAlive = pitKeepAlive;
+        return this;
+    }
+
+    /**
+     * Sets the number of search slices to be used to read the index. The 
number of search slices
+     * should be a multiple of the number of shards in your Elasticsearch 
cluster.
+     *
+     * @param numberOfSearchSlices the number of search slices
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int 
numberOfSearchSlices) {
+        checkArgument(numberOfSearchSlices > 0, "Number of search slices must 
be greater than 0.");
+        this.numberOfSearchSlices = numberOfSearchSlices;
+        return this;
+    }
+
+    /**
+     * Sets the name of the Elasticsearch index to be read.
+     *
+     * @param indexName name of the Elasticsearch index
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) {
+        checkNotNull(indexName);
+        this.indexName = indexName;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the 
Elasticsearch source.
+     * The given schema will be used to deserialize {@link 
org.elasticsearch.search.SearchHit}s into
+     * the output type.
+     *
+     * @param deserializationSchema the deserialization schema to use
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema) {
+        checkNotNull(deserializationSchema);
+        this.deserializationSchema = deserializationSchema;
+        return this;
+    }
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the username used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param username of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String 
username) {
+        checkNotNull(username);
+        this.username = username;
+        return this;
+    }
+
+    /**
+     * Sets the password used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param password of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String 
password) {
+        checkNotNull(password);
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Sets a prefix which used for every REST communication to the 
Elasticsearch cluster.
+     *
+     * @param prefix for the communication
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String 
prefix) {
+        checkNotNull(prefix);
+        this.connectionPathPrefix = prefix;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for requesting the connection of the Elasticsearch 
cluster from the

Review comment:
       ```suggestion
        * Sets the timeout for requesting the connection to the Elasticsearch 
cluster from the
   ```
   could be more clear

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumerator.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.elasticsearch.common.ElasticsearchUtil;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceConfiguration;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.core.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** The enumerator class for ElasticsearchSource. */

Review comment:
       It would be nice to have a brief description about the concepts this 
class relies on in ES ( PIT, in particular).

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7SearchHitDeserializationSchema.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import org.elasticsearch.search.SearchHit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A schema bridge for deserializing Elasticsearch's {@link SearchHit} into a 
flink managed

Review comment:
       ```suggestion
    * A schema bridge for deserializing Elasticsearch's {@link SearchHit} into 
a Flink managed
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumStateSerializer.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer Serializer} 
for the enumerator
+ * state of Elasticsearch source.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceEnumStateSerializer
+        implements SimpleVersionedSerializer<Elasticsearch7SourceEnumState> {
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(Elasticsearch7SourceEnumState obj) throws 
IOException {
+        Set<Elasticsearch7Split> assignedSplits = obj.getAssignedSplits();
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+
+            out.writeInt(assignedSplits.size());
+            for (Elasticsearch7Split split : assignedSplits) {
+                out.writeUTF(split.getPitId());
+                out.writeInt(split.getSliceId());
+            }
+            out.flush();
+
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public Elasticsearch7SourceEnumState deserialize(int version, byte[] 
serialized)
+            throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+
+            final int numSplits = in.readInt();
+            Set<Elasticsearch7Split> splits = new HashSet<>(numSplits);

Review comment:
       Consistency question: is there a reason all the remaining variables in 
this method are declared final, but this one is not?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/split/Elasticsearch7Split.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.split;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Objects;
+
+/** A {@link SourceSplit} for an Elasticsearch 'slice'. */

Review comment:
       Would be helpful to describe what the PIT is and how the actual 
splitting of data in Flink maps to the ES concepts.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+
+import org.apache.http.HttpHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link Elasticsearch7Source}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSource that reads the
+ * String values from an Elasticsearch index.
+ *
+ * <pre>{@code
+ * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder()
+ *     .setHosts(new HttpHost("localhost:9200")
+ *     .setIndexName("my-index")
+ *     .setDeserializationSchema(new 
Elasticsearch7SearchHitDeserializationSchema<String>() {
+ *          @Override
+ *          public void deserialize(SearchHit record, Collector<String> out) 
throws IOException {
+ *              out.collect(record.getSourceAsString());
+ *          }
+ *
+ *          @Override
+ *          public TypeInformation<String> getProducedType() {
+ *              return TypeInformation.of(String.class);
+ *          }
+ *      })
+ *     .build();
+ * }</pre>
+ *
+ * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and 
stops when the entire
+ * index has been read.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * ElasticsearchSource.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class);
+
+    private Duration pitKeepAlive = Duration.ofMinutes(5);
+    private int numberOfSearchSlices = 1;
+    private String indexName;
+    private Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema;
+
+    private List<HttpHost> hosts;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer socketTimeout;
+    private Integer connectionRequestTimeout;
+
+    Elasticsearch7SourceBuilder() {}
+
+    private boolean isGreaterOrEqual(Duration d1, Duration d2) {
+        return ((d1.compareTo(d2) >= 0));
+    }
+
+    /**
+     * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is 
required to a snapshot
+     * of the data in the index, to avoid reading the same data on subsequent 
search calls. If the
+     * PIT has expired the source will not be able to read data from this 
snapshot again. This also
+     * means that recovering from a checkpoint whose PIT has expired is not 
possible.
+     *
+     * @param pitKeepAlive duration of the PIT keep alive
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration 
pitKeepAlive) {
+        checkNotNull(pitKeepAlive);
+        checkArgument(
+                isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)),
+                "PIT keep alive should be at least 5 minutes.");
+        this.pitKeepAlive = pitKeepAlive;
+        return this;
+    }
+
+    /**
+     * Sets the number of search slices to be used to read the index. The 
number of search slices
+     * should be a multiple of the number of shards in your Elasticsearch 
cluster.
+     *
+     * @param numberOfSearchSlices the number of search slices
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int 
numberOfSearchSlices) {
+        checkArgument(numberOfSearchSlices > 0, "Number of search slices must 
be greater than 0.");
+        this.numberOfSearchSlices = numberOfSearchSlices;
+        return this;
+    }
+
+    /**
+     * Sets the name of the Elasticsearch index to be read.
+     *
+     * @param indexName name of the Elasticsearch index
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) {
+        checkNotNull(indexName);
+        this.indexName = indexName;
+        return this;
+    }
+
+    /**
+     * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the 
Elasticsearch source.
+     * The given schema will be used to deserialize {@link 
org.elasticsearch.search.SearchHit}s into
+     * the output type.
+     *
+     * @param deserializationSchema the deserialization schema to use
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema(
+            Elasticsearch7SearchHitDeserializationSchema<OUT> 
deserializationSchema) {
+        checkNotNull(deserializationSchema);
+        this.deserializationSchema = deserializationSchema;
+        return this;
+    }
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the username used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param username of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String 
username) {
+        checkNotNull(username);
+        this.username = username;
+        return this;
+    }
+
+    /**
+     * Sets the password used to authenticate the connection with the 
Elasticsearch cluster.
+     *
+     * @param password of the Elasticsearch cluster user
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String 
password) {
+        checkNotNull(password);
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Sets a prefix which used for every REST communication to the 
Elasticsearch cluster.
+     *
+     * @param prefix for the communication
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String 
prefix) {
+        checkNotNull(prefix);
+        this.connectionPathPrefix = prefix;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for requesting the connection of the Elasticsearch 
cluster from the
+     * connection manager.
+     *
+     * @param connectionRequestTimeout tiemout for the connection request
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionRequestTimeout(
+            int connectionRequestTimeout) {
+        checkState(
+                connectionRequestTimeout >= 0,
+                "Connection request timeout must be larger than or equal to 
0.");
+        this.connectionRequestTimeout = connectionRequestTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for establishing a connection of the Elasticsearch 
cluster.
+     *
+     * @param connectionTimeout timeout for the connection
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setConnectionTimeout(int 
connectionTimeout) {
+        checkState(connectionTimeout >= 0, "Connection timeout must be larger 
than or equal to 0.");
+        this.connectionTimeout = connectionTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for waiting for data or, put differently, a maximum 
period inactivity
+     * between two consecutive data packets.
+     *
+     * @param socketTimeout timeout for the socket
+     * @return this builder
+     */
+    public Elasticsearch7SourceBuilder<OUT> setSocketTimeout(int 
socketTimeout) {
+        checkState(socketTimeout >= 0, "Socket timeout must be larger than or 
equal to 0.");
+        this.socketTimeout = socketTimeout;
+        return this;
+    }
+
+    private void checkRequiredParameters() {

Review comment:
       Required parameters should be part of builder's constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to