fapaul commented on a change in pull request #18153:
URL: https://github.com/apache/flink/pull/18153#discussion_r791763232
##########
File path: flink-connectors/flink-connector-elasticsearch7/pom.xml
##########
@@ -91,6 +91,14 @@ under the License.
</exclusions>
</dependency>
+ <!-- Dependnecy for Table API integration -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
Review comment:
Are you sure `provided` is correct here? Either the connector requires
the dependency or it is `optional`.
##########
File path: flink-connectors/flink-connector-elasticsearch7/pom.xml
##########
@@ -91,6 +91,14 @@ under the License.
</exclusions>
</dependency>
+ <!-- Dependnecy for Table API integration -->
Review comment:
```suggestion
<!-- Dependency for Table API integration -->
```
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumState.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/** The state of {@link Elasticsearch7SourceEnumerator}. */
+@PublicEvolving
+public class Elasticsearch7SourceEnumState {
+
+ private final Set<Elasticsearch7Split> assignedSplits;
+
+ Elasticsearch7SourceEnumState(Set<Elasticsearch7Split> assignedSplits) {
+ this.assignedSplits = assignedSplits;
+ }
+
+ public Set<Elasticsearch7Split> getAssignedSplits() {
+ return assignedSplits;
+ }
+
+ public static Elasticsearch7SourceEnumState fromCollectionSnapshot(
+ final Collection<Elasticsearch7Split> splits) {
+
+ // create a copy to make sure the checkpoint is immutable
+ final Collection<Elasticsearch7Split> copy = new ArrayList<>(splits);
+ return new Elasticsearch7SourceEnumState(new HashSet<>(copy));
Review comment:
Isn't this already a copy? The objects are still shared via the
reference nevertheless if you add a copy step.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceConfiguration.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.http.HttpHost;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+
+/** Configuration class for the {@link Elasticsearch7Source}. */
+@Internal
+public class Elasticsearch7SourceConfiguration implements Serializable {
+ private final List<HttpHost> hosts;
+ private final String index;
+ private final int numberOfSlices;
+ private final Duration pitKeepAlive;
+
+ @VisibleForTesting
Review comment:
I do not think the annotation is strictly necessary because the ctor is
used also for non-testing purposes and only visibility is required for the
tests. Since all other methods are also public I would remove the annotation.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumerator.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.elasticsearch.common.ElasticsearchUtil;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceConfiguration;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.core.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** The enumerator class for ElasticsearchSource. */
+@Internal
+public class Elasticsearch7SourceEnumerator
+ implements SplitEnumerator<Elasticsearch7Split,
Elasticsearch7SourceEnumState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(Elasticsearch7SourceEnumerator.class);
+
+ private final Elasticsearch7SourceConfiguration sourceConfiguration;
+
+ private final NetworkClientConfig networkClientConfig;
+
+ private final SplitEnumeratorContext<Elasticsearch7Split> context;
+
+ private ArrayList<Elasticsearch7Split> splits;
+
+ private RestHighLevelClient client;
+
+ public Elasticsearch7SourceEnumerator(
+ Elasticsearch7SourceConfiguration sourceConfiguration,
+ NetworkClientConfig networkClientConfig,
+ SplitEnumeratorContext<Elasticsearch7Split> context) {
+ this(sourceConfiguration, networkClientConfig, context,
Collections.emptySet());
+ }
+
+ public Elasticsearch7SourceEnumerator(
+ Elasticsearch7SourceConfiguration sourceConfiguration,
+ NetworkClientConfig networkClientConfig,
+ SplitEnumeratorContext<Elasticsearch7Split> context,
+ Collection<Elasticsearch7Split> restoredSplits) {
+ this.sourceConfiguration = sourceConfiguration;
+ this.networkClientConfig = networkClientConfig;
+ this.context = context;
+ this.splits = new ArrayList<>(restoredSplits);
+ }
+
+ @Override
+ public void start() {
+ client = getRestClient();
+ LOG.info("Starting the ElasticsearchSourceEnumerator.");
+
+ if (splits.isEmpty()) {
+ context.callAsync(this::initializePointInTime,
this::handlePointInTimeCreation);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon
registration
Review comment:
I am afraid we have a problem here. Imagine the following scenario the
enumerator is starting and requesting asynchronously the point in time to
create splits. This call runs in the background and may take a while. In the
meantime, multiple readers register but there are no splits created yet so they
all get the signal for no more splits.
We either have to do a push-based model like Kafka does or wait until the
point in time is fetched before sending no more splits.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7RecordEmitter.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitState;
+
+/** The {@link RecordEmitter} implementation for both {@link
Elasticsearch7SourceReader}. */
+public class Elasticsearch7RecordEmitter<T>
Review comment:
You should add an annotation for the public classes inside a connector
module.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumStateSerializer.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer Serializer}
for the enumerator
+ * state of Elasticsearch source.
+ */
+@PublicEvolving
+public class Elasticsearch7SourceEnumStateSerializer
Review comment:
Does this class need to be public? I think it is not part of any public
API and in the source, we only return the interface `SimpleVersionedSerializer`.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7RecordEmitter.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7SplitState;
+
+/** The {@link RecordEmitter} implementation for both {@link
Elasticsearch7SourceReader}. */
+public class Elasticsearch7RecordEmitter<T>
+ implements RecordEmitter<Elasticsearch7Record<T>, T,
Elasticsearch7SplitState> {
+
+ @Override
+ public void emitRecord(
+ Elasticsearch7Record<T> element,
+ SourceOutput<T> output,
+ Elasticsearch7SplitState splitState)
+ throws Exception {
+ // Sink the record to source output.
+ output.collect(element.getValue());
Review comment:
Why do you not do the deserialization here? I'd expect this class to
handle the transformation from the`SearchHit` to the user-defined class.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/split/Elasticsearch7SplitState.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.split;
+
+/**
+ * This class extends {@link Elasticsearch7Split} for potential tracking of
additional (meta-)data.
+ */
+public class Elasticsearch7SplitState extends Elasticsearch7Split {
Review comment:
internal or package-private?
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7SearchHitReader.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.elasticsearch.common.ElasticsearchUtil;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceConfiguration;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.slice.SliceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/** This class reads all available {@link SearchHit}s from a {@link
Elasticsearch7Split}. */
+@Internal
+class Elasticsearch7SearchHitReader implements Closeable {
+
+ private final Elasticsearch7SourceConfiguration sourceConfiguration;
+ private final NetworkClientConfig networkClientConfig;
+ private final Elasticsearch7Split split;
+
+ private final RestHighLevelClient client;
+
+ private Object[] searchAfterSortValues;
+
+ static Elasticsearch7SearchHitReader createReader(
Review comment:
Nit: Why did you choose to offer a static init method? I usually use
them if I have to transform the input somehow before passing it to the ctor.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/split/Elasticsearch7SplitSerializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.split;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer}
for {@link
+ * Elasticsearch7Split}.
+ */
+public class Elasticsearch7SplitSerializer
Review comment:
Could be internal
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.elasticsearch.source.Elasticsearch7Source;
+import
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceBuilder;
+import
org.apache.flink.connector.elasticsearch.source.reader.Elasticsearch7SearchHitDeserializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An Elasticsearch {@link ScanTableSource}. */
+@Internal
+public class Elasticsearch7DynamicSource implements ScanTableSource {
+
+ /** Data type that describes the final output of the source. */
+ private final DataType producedDataType;
+
+ private final Elasticsearch7DynamicSourceConfiguration sourceConfig;
+
+ private final String tableIdentifier;
+
+ private final boolean failOnMissingFields;
+
+ private final boolean ignoreParseErrors;
+
+ private final TimestampFormat timestampFormat;
+
+ public Elasticsearch7DynamicSource(
+ DataType producedDataType,
+ Elasticsearch7DynamicSourceConfiguration sourceConfig,
+ String tableIdentifier,
+ boolean failOnMissingFields,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat) {
+ this.producedDataType = checkNotNull(producedDataType);
+ this.sourceConfig = checkNotNull(sourceConfig);
+ this.tableIdentifier = checkNotNull(tableIdentifier);
+ this.failOnMissingFields = failOnMissingFields;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormat = checkNotNull(timestampFormat);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final TypeInformation<RowData> producedTypeInformation =
+ context.createTypeInformation(producedDataType);
+
+ final Elasticsearch7SearchHitDeserializationSchema<RowData>
elasticsearchDeserializer =
+ new RowDataElasticsearch7SearchHitDeserializationSchema(
+ (RowType) producedDataType.getLogicalType(),
+ producedTypeInformation,
+ failOnMissingFields,
+ ignoreParseErrors,
+ timestampFormat);
+
+ final Elasticsearch7Source<RowData> elasticsearchSource =
+ createElasticsearchSource(elasticsearchDeserializer);
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData>
produceDataStream(StreamExecutionEnvironment execEnv) {
+ return execEnv.fromSource(
+ elasticsearchSource,
+ WatermarkStrategy.noWatermarks(),
Review comment:
I am not sure we can simply ignore the watermarks. Let's clarify that.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/enumerator/Elasticsearch7SourceEnumState.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.enumerator;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/** The state of {@link Elasticsearch7SourceEnumerator}. */
+@PublicEvolving
+public class Elasticsearch7SourceEnumState {
+
+ private final Set<Elasticsearch7Split> assignedSplits;
+
+ Elasticsearch7SourceEnumState(Set<Elasticsearch7Split> assignedSplits) {
+ this.assignedSplits = assignedSplits;
Review comment:
Nit: checkNotNull
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7SearchHitDeserializationSchema.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import org.elasticsearch.search.SearchHit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A schema bridge for deserializing Elasticsearch's {@link SearchHit} into a
flink managed
+ * instance.
+ *
+ * @param <T> The output message type for sinking to downstream flink operator.
+ */
+@PublicEvolving
+public interface Elasticsearch7SearchHitDeserializationSchema<T>
+ extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Initialization method for the schema. It is called before the actual
working methods {@link
+ * #deserialize} and thus suitable for one time setup work.
+ *
+ * <p>The provided {@link DeserializationSchema.InitializationContext} can
be used to access
+ * additional features such as e.g. registering user metrics.
+ *
+ * @param context Contextual information that can be used during
initialization.
+ */
+ @PublicEvolving
Review comment:
It is not necessary to annotate methods, they automatically inherit the
annotation from the class/interface.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/Elasticsearch7SourceConfiguration.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.http.HttpHost;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+
+/** Configuration class for the {@link Elasticsearch7Source}. */
+@Internal
+public class Elasticsearch7SourceConfiguration implements Serializable {
+ private final List<HttpHost> hosts;
+ private final String index;
+ private final int numberOfSlices;
+ private final Duration pitKeepAlive;
+
+ @VisibleForTesting
+ public Elasticsearch7SourceConfiguration(
+ List<HttpHost> hosts, String index, int numberOfSlices, Duration
pitKeepAlive) {
+ this.hosts = hosts;
+ this.index = index;
+ this.numberOfSlices = numberOfSlices;
+ this.pitKeepAlive = pitKeepAlive;
Review comment:
Nit: checkNotNull
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7Record.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.annotation.Internal;
+
+/** The record instance of the Elasticsearch source. */
+@Internal
+public class Elasticsearch7Record<T> {
Review comment:
Why do you need this class and cannot use just the generic type?
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
Review comment:
Please revert this line before doing the final submission.
##########
File path:
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/source/reader/Elasticsearch7SplitReader.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.source.reader;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.elasticsearch.common.NetworkClientConfig;
+import
org.apache.flink.connector.elasticsearch.source.Elasticsearch7SourceConfiguration;
+import
org.apache.flink.connector.elasticsearch.source.split.Elasticsearch7Split;
+import org.apache.flink.util.Collector;
+
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link SplitReader} implementation that reads {@link
Elasticsearch7Record}s from {@link
+ * Elasticsearch7Split}s.
+ *
+ * @param <T> the type of the record to be emitted from the Source.
+ */
+public class Elasticsearch7SplitReader<T>
+ implements SplitReader<Elasticsearch7Record<T>, Elasticsearch7Split> {
+ private static final Logger LOG =
LoggerFactory.getLogger(Elasticsearch7SplitReader.class);
+ private final Elasticsearch7SourceConfiguration sourceConfiguration;
+ private final NetworkClientConfig networkClientConfig;
+ private final Queue<Elasticsearch7Split> splits;
+
+ @Nullable private Elasticsearch7SearchHitReader currentReader;
+ @Nullable private String currentSplitId;
+
+ private final Elasticsearch7SearchHitDeserializationSchema<T>
deserializationSchema;
+ private final SimpleCollector<T> collector;
+
+ public Elasticsearch7SplitReader(
+ Elasticsearch7SourceConfiguration sourceConfiguration,
+ NetworkClientConfig networkClientConfig,
+ Elasticsearch7SearchHitDeserializationSchema<T>
deserializationSchema) {
+ this.sourceConfiguration = sourceConfiguration;
+ this.networkClientConfig = networkClientConfig;
+ this.deserializationSchema = deserializationSchema;
+ this.splits = new ArrayDeque<>();
+ this.collector = new SimpleCollector<>();
+ }
+
+ private RecordsWithSplitIds<Elasticsearch7Record<T>>
createRecordsFromSearchResults(
+ Collection<SearchHit> searchHits) throws IOException {
+ Collection<Elasticsearch7Record<T>> recordsForSplit = new
ArrayList<>();
+ for (SearchHit hit : searchHits) {
+ try {
+ deserializationSchema.deserialize(hit, collector);
+ collector
+ .getRecords()
+ .forEach(r -> recordsForSplit.add(new
Elasticsearch7Record<>(r)));
+ } catch (Exception e) {
+ throw new IOException("Failed to deserialize consumer record
due to", e);
+ } finally {
+ collector.reset();
+ }
+ }
+
+ return ElasticsearchSplitRecords.forRecords(currentSplitId,
recordsForSplit);
+ }
+
+ @Override
+ public RecordsWithSplitIds<Elasticsearch7Record<T>> fetch() throws
IOException {
Review comment:
I think you want to return the `SearchHit`s from this method to allow
better scalability. The idea behind the split reader is to do the basic IO task
(reading from elasticsearch) and return the elements as fast as possible so
that the source readers can immediately consume them. Now the deserialization
and the IO task is coupled and may lead to bad performance.
The fetch method is used internally to push the elements to a concurrent
queue that is used to hand over to the split reader threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]