[
https://issues.apache.org/jira/browse/BEAM-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17480088#comment-17480088
]
Evan Galpin commented on BEAM-10945:
------------------------------------
[~echauchot] I think I used confusing terminology given the use of `bundle` as
a kind of primitive throughout Beam runners. What I was referring to was the
`nbBundles` variable which is currently enforced to be no greater than 1024[1].
Tracing the usage of`nbBundles`, its value gets used throughout the
`BoundedElasticsearchSource` as `numSlices`. If there were a user setting to be
able to enforce a different max than 1024 for `nbBundles` I feel that might
give the required control to address part of the issue here, where 1024
parallel sources/slices overwhelms a singular machine that's running the
DirectRunner and reading from a cross-cluster configuration.
Ex. (heavily pseudo-code):
{code:java}
// sets this.userSuppliedMaxSplits = 4
ESRead.withMaxSplits(4)
// in split() method
...
int nbBundles = ObjectUtils.firstNonNull(spec.getUserSuppliedMaxSplits(), (int)
Math.ceil(nbBundlesFloat))
if (nbBundles > 1024) {
nbBundles = 1024;
}
{code}
Thoughts?
[1]
[https://github.com/apache/beam/blob/f31cad083986836e03bc1305d56efc969f20630a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L856-L857]
> ElasticsearchIO performs 0 division on DirectRunner
> ---------------------------------------------------
>
> Key: BEAM-10945
> URL: https://issues.apache.org/jira/browse/BEAM-10945
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch, runner-direct
> Affects Versions: 2.23.0
> Environment: Beam 2.23
> Java 1.8.0_265
> Ubuntu 16.04
> Elastic version of cluster 7.9.1, cross cluster setup
> Parallelism of direct runner 8
> Reporter: Milan Nikl
> Priority: P3
>
> h1. Environment configuration
> In my company we use [Elasticsearch cross
> cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
> setup for search. Cluster version is 7.9.1.
> I intended to use ElasticsearchIO for reading application logs and
> subsequently producing some aggregated data.
> h1. Problem description
> # In cross cluster ES setup, there is no {{/<index>/_stats}} API available,
> so it is not possible to compute
> [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
> properly.
> # {{statsJson}} returned by the cluster looks like this:
> {quote}Unknown macro: \{ "_shards" }
> ,
> "_all" :
> Unknown macro: \{ "primaries" }
> ,
> "total" : \{ }
> },
> "indices" : \{ }
> }
> {quote}
> # That means that {{totalCount}} value cannot be parsed from the json and is
> thus set to {{0}}.
> # Which means that {{estimatedByteSize}} value [is set to
> 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
> (Which itself is a workaround for similar issue.)
> # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in
> [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
> which does not check the value and performs division of two {{long}} values,
> which of course results in {{0}} for any {{targetParallelism > 1}}.
> # Then
> [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
> is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which
> sets {{nbBundlesFloat}} value to infinity.
> # Even though the number of bundles is ceiled at {{1024}}, reading from 1024
> BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually
> impossible to use on direct runner.
> h1. Resolution suggestion
> I still haven't tested reading from ElasticsearchIO on proper runner (we use
> flink 1.10.2), so I cannot either confirm or deny its functionality on our
> elastic setup. At the moment I'm just suggesting few checks of input values
> so the zero division and unnecessary parallelism problems are eliminated on
> direct runner.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)