[
https://issues.apache.org/jira/browse/FLINK-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821604#comment-15821604
]
ASF GitHub Bot commented on FLINK-4988:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/3112#discussion_r95969879
--- Diff:
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>
+ * This class implements the common behaviour across Elasticsearch
versions, such as
+ * the use of an internal {@link BulkProcessor} to buffer multiple {@link
ActionRequest}s before
+ * sending the requests to the cluster, as well as passing input records
to the user provided
+ * {@link ElasticsearchSinkFunction} for processing.
+ *
+ * <p>
+ * The version specific behaviours for creating a {@link Client} to
connect to a Elasticsearch cluster
+ * should be defined by concrete implementations of a {@link
ElasticsearchClientFactory}, which is to be provided to the
+ * constructor of this class.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T>
{
+
+ private static final long serialVersionUID = -1007596293618451942L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
+ //
------------------------------------------------------------------------
+ // Internal bulk processor configuration
+ //
------------------------------------------------------------------------
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
"bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
"bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
"bulk.flush.interval.ms";
+
+ private final Integer bulkProcessorFlushMaxActions;
+ private final Integer bulkProcessorFlushMaxSizeMb;
+ private final Integer bulkProcessorFlushIntervalMillis;
--- End diff --
Why are you using boxed types here?
> Elasticsearch 5.x support
> -------------------------
>
> Key: FLINK-4988
> URL: https://issues.apache.org/jira/browse/FLINK-4988
> Project: Flink
> Issue Type: New Feature
> Reporter: Mike Dias
>
> Elasticsearch 5.x was released:
> https://www.elastic.co/blog/elasticsearch-5-0-0-released
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)