[ 
https://issues.apache.org/jira/browse/BEAM-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Milan Nikl updated BEAM-10945:
------------------------------
    Description: 
h1. Environment configuration

In my company we use [Elasticsearch cross 
cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
 setup for search. Cluster version is 7.9.1.

I intended to use ElasticsearchIO for reading application logs and subsequently 
producing some aggregated data.
h1. Problem description
 # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so 
it is not possible to compute 
[ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
 properly.
 # {{statsJson}} returned by the cluster looks like this:
{quote}Unknown macro: \{ "_shards" }
 ,
 "_all" :
 Unknown macro: \{ "primaries" }
 ,
 "total" : \{ }
 },
 "indices" : \{ }
 }
{quote}
 # That means that {{totalCount}} value cannot be parsed from the json and is 
thus set to {{0}}.
 # Which means that {{estimatedByteSize}} value [is set to 
1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
 (Which itself is a workaround for similar issue.)
 # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
[BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
 which does not check the value and performs division of two {{long}} values, 
which of course results in {{0}} for any {{targetParallelism > 1}}.
 # Then 
[ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
 is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
sets {{nbBundlesFloat}} value to infinity.
 # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually 
impossible to use on direct runner.

h1. Resolution suggestion

I still haven't tested reading from ElasticsearchIO on proper runner (we use 
flink 1.10.2), so I cannot either confirm or deny its functionality on our 
elastic setup. At the moment I|m just suggesting few checks of input values so 
the zero division and unnecessary parallelism problems are eliminated on direct 
runner.

  was:
h1. Environment configuration

In my company we use [Elasticsearch cross 
cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
 setup for search. Cluster version is 7.9.1.

I intended to use ElasticsearchIO for reading application logs and subsequently 
producing some aggregated data.
h1. Problem description
 # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so 
it is not possible to compute 
[ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
 properly.
 # {{statsJson}} returned by the cluster looks like this:
{quote}Unknown macro: \{ "_shards" }
,
 "_all" :
Unknown macro: \{ "primaries" }
,
 "total" : \{ }
 },
 "indices" : \{ }
 }
{quote}
 # That means that {{totalCount}} value cannot be parsed from the json and is 
thus set to {{0}}.
 # Which means that {{estimatedByteSize}} value [is set to 
1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
 (Which itself is a workaround for similar issue.)
 # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
[BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
 which does not check the value and performs division of two {{long}} values, 
which of course results in {{0}} for any {{targetParallelism > 1}}.
 # Then 
[ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
 is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
sets {{nbBundlesFloat}} value to infinity.
 # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
{{BoundedElasticsearchSources concurrently makes the ElasticsearchIO 
}}virtually impossible to use on direct runner.

h1. Resolution suggestion

I still haven't tested reading from ElasticsearchIO on proper runner (we use 
flink 1.10.2), so I cannot either confirm or deny its functionality on our 
elastic setup. At the moment I|m just suggesting few checks of input values so 
the zero division and unnecessary parallelism problems are eliminated on direct 
runner.


> ElasticsearchIO performs 0 division on DirectRunner
> ---------------------------------------------------
>
>                 Key: BEAM-10945
>                 URL: https://issues.apache.org/jira/browse/BEAM-10945
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch, runner-direct
>    Affects Versions: 2.23.0
>         Environment: * Beam 2.23
> * Java 1.8.0_265
> * Ubuntu 16.04
> * Elastic version of cluster 7.9.1, cross cluster setup
> * Parallelism of direct runner 8
>            Reporter: Milan Nikl
>            Priority: P2
>
> h1. Environment configuration
> In my company we use [Elasticsearch cross 
> cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
>  setup for search. Cluster version is 7.9.1.
> I intended to use ElasticsearchIO for reading application logs and 
> subsequently producing some aggregated data.
> h1. Problem description
>  # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, 
> so it is not possible to compute 
> [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
>  properly.
>  # {{statsJson}} returned by the cluster looks like this:
> {quote}Unknown macro: \{ "_shards" }
>  ,
>  "_all" :
>  Unknown macro: \{ "primaries" }
>  ,
>  "total" : \{ }
>  },
>  "indices" : \{ }
>  }
> {quote}
>  # That means that {{totalCount}} value cannot be parsed from the json and is 
> thus set to {{0}}.
>  # Which means that {{estimatedByteSize}} value [is set to 
> 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
>  (Which itself is a workaround for similar issue.)
>  # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
> [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
>  which does not check the value and performs division of two {{long}} values, 
> which of course results in {{0}} for any {{targetParallelism > 1}}.
>  # Then 
> [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
>  is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
> sets {{nbBundlesFloat}} value to infinity.
>  # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
> BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually 
> impossible to use on direct runner.
> h1. Resolution suggestion
> I still haven't tested reading from ElasticsearchIO on proper runner (we use 
> flink 1.10.2), so I cannot either confirm or deny its functionality on our 
> elastic setup. At the moment I|m just suggesting few checks of input values 
> so the zero division and unnecessary parallelism problems are eliminated on 
> direct runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to