afedulov commented on a change in pull request #18153: URL: https://github.com/apache/flink/pull/18153#discussion_r798750480
########## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/ElasticsearchUtil.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.common; + +import org.apache.flink.annotation.Internal; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClientBuilder; + +/** Collection of utility methods for the Elasticsearch source and sink. */ +@Internal +public class ElasticsearchUtil { + + public static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, NetworkClientConfig config) { + if (config.getConnectionPathPrefix() != null) { Review comment: Nit: config check not null ########## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/ElasticsearchUtil.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.common; + +import org.apache.flink.annotation.Internal; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClientBuilder; + +/** Collection of utility methods for the Elasticsearch source and sink. */ +@Internal +public class ElasticsearchUtil { + + public static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, NetworkClientConfig config) { + if (config.getConnectionPathPrefix() != null) { + builder.setPathPrefix(config.getConnectionPathPrefix()); Review comment: Nit: builder check not null ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> Review comment: ```suggestion * ElasticsearchSource emitting records of {@code ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> + * String</code> type. Review comment: ```suggestion * String} type. ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumStateSerializer.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source.enumerator; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * The {@link org.apache.flink.core.io.SimpleVersionedSerializer Serializer} for the enumerator + * state of Elasticsearch source. Review comment: ```suggestion * state of the Elasticsearch source. ``` ########## File path: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/common/NetworkClientConfig.java ########## @@ -16,13 +16,17 @@ * limitations under the License. */ -package org.apache.flink.connector.elasticsearch.sink; +package org.apache.flink.connector.elasticsearch.common; + +import org.apache.flink.annotation.Internal; import javax.annotation.Nullable; import java.io.Serializable; -class NetworkClientConfig implements Serializable { +/** This class encapsulates information on how to connect against an Elasticsearch cluster. */ Review comment: ```suggestion /** This class encapsulates information on how to connect to an Elasticsearch cluster. */ ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.source.Elasticsearch7Source; +import org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceBuilder; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.StringUtils; + +import org.apache.http.HttpHost; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** An Elasticsearch {@link ScanTableSource}. */ +@Internal +public class Elasticsearch7DynamicSource implements ScanTableSource { Review comment: `Dynamic` without the reference to `Table` is somewhat confusing in the naming. I believe it should be consistent with the existing usage, i.e. `HBaseDynamicTableSource` , `JdbcDynamicTableSource`. The same applies to the related configuration classes etc. ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataElasticsearch7SearchHitDeserializationSchema.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonToRowDataConverters; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; + +/** A {@link Elasticsearch7SearchHitDeserializationSchema} that deserializes to {@link RowData}. */ +public class RowDataElasticsearch7SearchHitDeserializationSchema + implements Elasticsearch7SearchHitDeserializationSchema<RowData> { + + /** TypeInformation of the produced {@link RowData}. */ + private final TypeInformation<RowData> producedTypeInformation; + + /** Flag indicating whether to fail if a field is missing. */ + private final boolean failOnMissingField; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + /** Timestamp format specification which is used to parse timestamp. */ + private final TimestampFormat timestampFormat; + + /** + * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data + * structures. + */ + private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; + + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + RowDataElasticsearch7SearchHitDeserializationSchema( + RowType rowType, + TypeInformation<RowData> producedTypeInformation, + boolean failOnMissingField, + boolean ignoreParseErrors, + TimestampFormat timestampFormat) { + this.producedTypeInformation = producedTypeInformation; + this.failOnMissingField = failOnMissingField; + this.ignoreParseErrors = ignoreParseErrors; + this.timestampFormat = timestampFormat; + + if (ignoreParseErrors && failOnMissingField) { + throw new IllegalArgumentException( + "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); + } + + this.runtimeConverter = + new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) + .createConverter(rowType); + + boolean hasDecimalType = + LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); + if (hasDecimalType) { + objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); Review comment: Just curious, what is the rational not not enabling this by default? Memory usage? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire * + * index has been read. + * + * <p>See {@link Elasticsearch7SourceBuilder} for more details. + * + * @param <OUT> the output type of the source. + */ +@PublicEvolving +public class Elasticsearch7Source<OUT> + implements Source<OUT, Elasticsearch7Split, Elasticsearch7SourceEnumState>, + ResultTypeQueryable<OUT> { + + private final Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + private final Elasticsearch7SourceConfiguration sourceConfiguration; + private final NetworkClientConfig networkClientConfig; + + Elasticsearch7Source( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema, + Elasticsearch7SourceConfiguration sourceConfiguration, + NetworkClientConfig networkClientConfig) { + this.deserializationSchema = deserializationSchema; + this.sourceConfiguration = sourceConfiguration; + this.networkClientConfig = networkClientConfig; + } + + /** + * Create an {@link Elasticsearch7SourceBuilder} to construct a new {@link Review comment: ```suggestion * Creates an {@link Elasticsearch7SourceBuilder} for constructing a new {@link ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire * + * index has been read. + * + * <p>See {@link Elasticsearch7SourceBuilder} for more details. + * + * @param <OUT> the output type of the source. + */ +@PublicEvolving +public class Elasticsearch7Source<OUT> + implements Source<OUT, Elasticsearch7Split, Elasticsearch7SourceEnumState>, + ResultTypeQueryable<OUT> { + + private final Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + private final Elasticsearch7SourceConfiguration sourceConfiguration; + private final NetworkClientConfig networkClientConfig; + + Elasticsearch7Source( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema, Review comment: Nit: null checks for parameters ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire * Review comment: ```suggestion * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a + * ElasticsearchSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() Review comment: A couple of things seem to be off: - `new` is not needed (builder() is a static method) - `setHosts` line is missing a parenthesis Maybe check that the example compiles outside of the docs. ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7Source.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumState; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumStateSerializer; +import org.apache.flink.connector.elasticsearch.source.enumerator.Elasticsearch7SourceEnumerator; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SourceReader; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +/** + * The Source implementation for Elasticsearch. Please use a {@link Elasticsearch7SourceBuilder} to + * construct a {@link Elasticsearch7Source}. The following example shows how to create a Review comment: ```suggestion * construct an {@link Elasticsearch7Source}. The following example shows how to create an ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() Review comment: Same as the above: - `new` is not needed (builder() is a static method) - `setHosts` line is missing a parenthesis - check that the example compiles outside of the docs ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire + * index has been read. + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * ElasticsearchSource. + */ +@PublicEvolving +public class Elasticsearch7SourceBuilder<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class); + + private Duration pitKeepAlive = Duration.ofMinutes(5); + private int numberOfSearchSlices = 1; + private String indexName; + private Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + + private List<HttpHost> hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer socketTimeout; + private Integer connectionRequestTimeout; + + Elasticsearch7SourceBuilder() {} + + private boolean isGreaterOrEqual(Duration d1, Duration d2) { + return ((d1.compareTo(d2) >= 0)); + } + + /** + * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is required to a snapshot + * of the data in the index, to avoid reading the same data on subsequent search calls. If the + * PIT has expired the source will not be able to read data from this snapshot again. This also + * means that recovering from a checkpoint whose PIT has expired is not possible. + * + * @param pitKeepAlive duration of the PIT keep alive + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration pitKeepAlive) { + checkNotNull(pitKeepAlive); + checkArgument( + isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)), + "PIT keep alive should be at least 5 minutes."); + this.pitKeepAlive = pitKeepAlive; + return this; + } + + /** + * Sets the number of search slices to be used to read the index. The number of search slices + * should be a multiple of the number of shards in your Elasticsearch cluster. + * + * @param numberOfSearchSlices the number of search slices + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int numberOfSearchSlices) { + checkArgument(numberOfSearchSlices > 0, "Number of search slices must be greater than 0."); + this.numberOfSearchSlices = numberOfSearchSlices; + return this; + } + + /** + * Sets the name of the Elasticsearch index to be read. + * + * @param indexName name of the Elasticsearch index + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) { + checkNotNull(indexName); + this.indexName = indexName; + return this; + } + + /** + * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the Elasticsearch source. + * The given schema will be used to deserialize {@link org.elasticsearch.search.SearchHit}s into + * the output type. + * + * @param deserializationSchema the deserialization schema to use + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema) { + checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Elasticsearch cluster. + * + * @param username of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the connection with the Elasticsearch cluster. + * + * @param password of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Elasticsearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Elasticsearch cluster from the + * connection manager. + * + * @param connectionRequestTimeout tiemout for the connection request + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionRequestTimeout( + int connectionRequestTimeout) { + checkState( + connectionRequestTimeout >= 0, + "Connection request timeout must be larger than or equal to 0."); + this.connectionRequestTimeout = connectionRequestTimeout; + return this; + } + + /** + * Sets the timeout for establishing a connection of the Elasticsearch cluster. Review comment: ```suggestion * Sets the timeout for establishing a connection to the Elasticsearch cluster. ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire + * index has been read. + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * ElasticsearchSource. + */ +@PublicEvolving +public class Elasticsearch7SourceBuilder<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class); + + private Duration pitKeepAlive = Duration.ofMinutes(5); + private int numberOfSearchSlices = 1; + private String indexName; + private Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + + private List<HttpHost> hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer socketTimeout; + private Integer connectionRequestTimeout; + + Elasticsearch7SourceBuilder() {} + + private boolean isGreaterOrEqual(Duration d1, Duration d2) { + return ((d1.compareTo(d2) >= 0)); + } + + /** + * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is required to a snapshot + * of the data in the index, to avoid reading the same data on subsequent search calls. If the + * PIT has expired the source will not be able to read data from this snapshot again. This also + * means that recovering from a checkpoint whose PIT has expired is not possible. + * + * @param pitKeepAlive duration of the PIT keep alive + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration pitKeepAlive) { + checkNotNull(pitKeepAlive); + checkArgument( + isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)), + "PIT keep alive should be at least 5 minutes."); + this.pitKeepAlive = pitKeepAlive; + return this; + } + + /** + * Sets the number of search slices to be used to read the index. The number of search slices + * should be a multiple of the number of shards in your Elasticsearch cluster. + * + * @param numberOfSearchSlices the number of search slices + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int numberOfSearchSlices) { + checkArgument(numberOfSearchSlices > 0, "Number of search slices must be greater than 0."); + this.numberOfSearchSlices = numberOfSearchSlices; + return this; + } + + /** + * Sets the name of the Elasticsearch index to be read. + * + * @param indexName name of the Elasticsearch index + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) { + checkNotNull(indexName); + this.indexName = indexName; + return this; + } + + /** + * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the Elasticsearch source. + * The given schema will be used to deserialize {@link org.elasticsearch.search.SearchHit}s into + * the output type. + * + * @param deserializationSchema the deserialization schema to use + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema) { + checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); Review comment: Is the case that the hosts elements in the array are themselves `nulls` handled elsewhere? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire + * index has been read. + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * ElasticsearchSource. + */ +@PublicEvolving +public class Elasticsearch7SourceBuilder<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class); + + private Duration pitKeepAlive = Duration.ofMinutes(5); + private int numberOfSearchSlices = 1; + private String indexName; + private Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + + private List<HttpHost> hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer socketTimeout; + private Integer connectionRequestTimeout; + + Elasticsearch7SourceBuilder() {} + + private boolean isGreaterOrEqual(Duration d1, Duration d2) { + return ((d1.compareTo(d2) >= 0)); + } + + /** + * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is required to a snapshot + * of the data in the index, to avoid reading the same data on subsequent search calls. If the + * PIT has expired the source will not be able to read data from this snapshot again. This also + * means that recovering from a checkpoint whose PIT has expired is not possible. + * + * @param pitKeepAlive duration of the PIT keep alive + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration pitKeepAlive) { + checkNotNull(pitKeepAlive); + checkArgument( + isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)), + "PIT keep alive should be at least 5 minutes."); + this.pitKeepAlive = pitKeepAlive; + return this; + } + + /** + * Sets the number of search slices to be used to read the index. The number of search slices + * should be a multiple of the number of shards in your Elasticsearch cluster. + * + * @param numberOfSearchSlices the number of search slices + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int numberOfSearchSlices) { + checkArgument(numberOfSearchSlices > 0, "Number of search slices must be greater than 0."); + this.numberOfSearchSlices = numberOfSearchSlices; + return this; + } + + /** + * Sets the name of the Elasticsearch index to be read. + * + * @param indexName name of the Elasticsearch index + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) { + checkNotNull(indexName); + this.indexName = indexName; + return this; + } + + /** + * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the Elasticsearch source. + * The given schema will be used to deserialize {@link org.elasticsearch.search.SearchHit}s into + * the output type. + * + * @param deserializationSchema the deserialization schema to use + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema) { + checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Elasticsearch cluster. + * + * @param username of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the connection with the Elasticsearch cluster. + * + * @param password of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Elasticsearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Elasticsearch cluster from the + * connection manager. + * + * @param connectionRequestTimeout tiemout for the connection request Review comment: ```suggestion * @param connectionRequestTimeout timeout for the connection request ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire + * index has been read. + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * ElasticsearchSource. + */ +@PublicEvolving +public class Elasticsearch7SourceBuilder<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class); + + private Duration pitKeepAlive = Duration.ofMinutes(5); + private int numberOfSearchSlices = 1; + private String indexName; + private Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + + private List<HttpHost> hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer socketTimeout; + private Integer connectionRequestTimeout; + + Elasticsearch7SourceBuilder() {} + + private boolean isGreaterOrEqual(Duration d1, Duration d2) { + return ((d1.compareTo(d2) >= 0)); + } + + /** + * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is required to a snapshot + * of the data in the index, to avoid reading the same data on subsequent search calls. If the + * PIT has expired the source will not be able to read data from this snapshot again. This also + * means that recovering from a checkpoint whose PIT has expired is not possible. + * + * @param pitKeepAlive duration of the PIT keep alive + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration pitKeepAlive) { + checkNotNull(pitKeepAlive); + checkArgument( + isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)), + "PIT keep alive should be at least 5 minutes."); + this.pitKeepAlive = pitKeepAlive; + return this; + } + + /** + * Sets the number of search slices to be used to read the index. The number of search slices + * should be a multiple of the number of shards in your Elasticsearch cluster. + * + * @param numberOfSearchSlices the number of search slices + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int numberOfSearchSlices) { + checkArgument(numberOfSearchSlices > 0, "Number of search slices must be greater than 0."); + this.numberOfSearchSlices = numberOfSearchSlices; + return this; + } + + /** + * Sets the name of the Elasticsearch index to be read. + * + * @param indexName name of the Elasticsearch index + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) { + checkNotNull(indexName); + this.indexName = indexName; + return this; + } + + /** + * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the Elasticsearch source. + * The given schema will be used to deserialize {@link org.elasticsearch.search.SearchHit}s into + * the output type. + * + * @param deserializationSchema the deserialization schema to use + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema) { + checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Elasticsearch cluster. + * + * @param username of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the connection with the Elasticsearch cluster. + * + * @param password of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Elasticsearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Elasticsearch cluster from the Review comment: ```suggestion * Sets the timeout for requesting the connection to the Elasticsearch cluster from the ``` could be more clear ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumerator.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.elasticsearch.common.ElasticsearchUtil; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceConfiguration; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.core.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** The enumerator class for ElasticsearchSource. */ Review comment: It would be nice to have a brief description about the concepts this class relies on in ES ( PIT, in particular). ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7SearchHitDeserializationSchema.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source.reader; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A schema bridge for deserializing Elasticsearch's {@link SearchHit} into a flink managed Review comment: ```suggestion * A schema bridge for deserializing Elasticsearch's {@link SearchHit} into a Flink managed ``` ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumStateSerializer.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source.enumerator; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * The {@link org.apache.flink.core.io.SimpleVersionedSerializer Serializer} for the enumerator + * state of Elasticsearch source. + */ +@PublicEvolving +public class Elasticsearch7SourceEnumStateSerializer + implements SimpleVersionedSerializer<Elasticsearch7SourceEnumState> { + private static final int CURRENT_VERSION = 0; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(Elasticsearch7SourceEnumState obj) throws IOException { + Set<Elasticsearch7Split> assignedSplits = obj.getAssignedSplits(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + + out.writeInt(assignedSplits.size()); + for (Elasticsearch7Split split : assignedSplits) { + out.writeUTF(split.getPitId()); + out.writeInt(split.getSliceId()); + } + out.flush(); + + return baos.toByteArray(); + } + } + + @Override + public Elasticsearch7SourceEnumState deserialize(int version, byte[] serialized) + throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + + final int numSplits = in.readInt(); + Set<Elasticsearch7Split> splits = new HashSet<>(numSplits); Review comment: Consistency question: is there a reason all the remaining variables in this method are declared final, but this one is not? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/split/Elasticsearch7Split.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source.split; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.Objects; + +/** A {@link SourceSplit} for an Elasticsearch 'slice'. */ Review comment: Would be helpful to describe what the PIT is and how the actual splitting of data in Flink maps to the ES concepts. ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceBuilder.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema; + +import org.apache.http.HttpHost; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The @builder class for {@link Elasticsearch7Source}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSource that reads the + * String values from an Elasticsearch index. + * + * <pre>{@code + * Elasticsearch7Source<String> source = new Elasticsearch7Source.builder() + * .setHosts(new HttpHost("localhost:9200") + * .setIndexName("my-index") + * .setDeserializationSchema(new Elasticsearch7SearchHitDeserializationSchema<String>() { + * @Override + * public void deserialize(SearchHit record, Collector<String> out) throws IOException { + * out.collect(record.getSourceAsString()); + * } + * + * @Override + * public TypeInformation<String> getProducedType() { + * return TypeInformation.of(String.class); + * } + * }) + * .build(); + * }</pre> + * + * <p>The ElasticsearchSource runs in a {@link Boundedness#BOUNDED} mode and stops when the entire + * index has been read. + * + * <p>Check the Java docs of each individual methods to learn more about the settings to build a + * ElasticsearchSource. + */ +@PublicEvolving +public class Elasticsearch7SourceBuilder<OUT> { + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SourceBuilder.class); + + private Duration pitKeepAlive = Duration.ofMinutes(5); + private int numberOfSearchSlices = 1; + private String indexName; + private Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema; + + private List<HttpHost> hosts; + private String username; + private String password; + private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer socketTimeout; + private Integer connectionRequestTimeout; + + Elasticsearch7SourceBuilder() {} + + private boolean isGreaterOrEqual(Duration d1, Duration d2) { + return ((d1.compareTo(d2) >= 0)); + } + + /** + * Sets the keep alive duration for the Point-in-Time (PIT). The PIT is required to a snapshot + * of the data in the index, to avoid reading the same data on subsequent search calls. If the + * PIT has expired the source will not be able to read data from this snapshot again. This also + * means that recovering from a checkpoint whose PIT has expired is not possible. + * + * @param pitKeepAlive duration of the PIT keep alive + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setPitKeepAlive(Duration pitKeepAlive) { + checkNotNull(pitKeepAlive); + checkArgument( + isGreaterOrEqual(pitKeepAlive, Duration.ofMinutes(5)), + "PIT keep alive should be at least 5 minutes."); + this.pitKeepAlive = pitKeepAlive; + return this; + } + + /** + * Sets the number of search slices to be used to read the index. The number of search slices + * should be a multiple of the number of shards in your Elasticsearch cluster. + * + * @param numberOfSearchSlices the number of search slices + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setNumberOfSearchSlices(int numberOfSearchSlices) { + checkArgument(numberOfSearchSlices > 0, "Number of search slices must be greater than 0."); + this.numberOfSearchSlices = numberOfSearchSlices; + return this; + } + + /** + * Sets the name of the Elasticsearch index to be read. + * + * @param indexName name of the Elasticsearch index + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setIndexName(String indexName) { + checkNotNull(indexName); + this.indexName = indexName; + return this; + } + + /** + * Sets the {@link Elasticsearch7SearchHitDeserializationSchema} for the Elasticsearch source. + * The given schema will be used to deserialize {@link org.elasticsearch.search.SearchHit}s into + * the output type. + * + * @param deserializationSchema the deserialization schema to use + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setDeserializationSchema( + Elasticsearch7SearchHitDeserializationSchema<OUT> deserializationSchema) { + checkNotNull(deserializationSchema); + this.deserializationSchema = deserializationSchema; + return this; + } + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the username used to authenticate the connection with the Elasticsearch cluster. + * + * @param username of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionUsername(String username) { + checkNotNull(username); + this.username = username; + return this; + } + + /** + * Sets the password used to authenticate the connection with the Elasticsearch cluster. + * + * @param password of the Elasticsearch cluster user + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPassword(String password) { + checkNotNull(password); + this.password = password; + return this; + } + + /** + * Sets a prefix which used for every REST communication to the Elasticsearch cluster. + * + * @param prefix for the communication + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionPathPrefix(String prefix) { + checkNotNull(prefix); + this.connectionPathPrefix = prefix; + return this; + } + + /** + * Sets the timeout for requesting the connection of the Elasticsearch cluster from the + * connection manager. + * + * @param connectionRequestTimeout tiemout for the connection request + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionRequestTimeout( + int connectionRequestTimeout) { + checkState( + connectionRequestTimeout >= 0, + "Connection request timeout must be larger than or equal to 0."); + this.connectionRequestTimeout = connectionRequestTimeout; + return this; + } + + /** + * Sets the timeout for establishing a connection of the Elasticsearch cluster. + * + * @param connectionTimeout timeout for the connection + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setConnectionTimeout(int connectionTimeout) { + checkState(connectionTimeout >= 0, "Connection timeout must be larger than or equal to 0."); + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets the timeout for waiting for data or, put differently, a maximum period inactivity + * between two consecutive data packets. + * + * @param socketTimeout timeout for the socket + * @return this builder + */ + public Elasticsearch7SourceBuilder<OUT> setSocketTimeout(int socketTimeout) { + checkState(socketTimeout >= 0, "Socket timeout must be larger than or equal to 0."); + this.socketTimeout = socketTimeout; + return this; + } + + private void checkRequiredParameters() { Review comment: Required parameters should be part of builder's constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
