[ 
https://issues.apache.org/jira/browse/BEAM-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Milan Nikl updated BEAM-10945:
------------------------------
    Description: 
h1. Environment configuration

In my company we use [Elasticsearch cross 
cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
 setup for search. Cluster version is 7.9.1.

I intended to use ElasticsearchIO for reading application logs and subsequently 
producing some aggregated data.
h1. Problem description
 # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so 
it is not possible to compute 
[ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
 properly.
 # {{statsJson}} returned by the cluster looks like this:
{quote}Unknown macro: \{ "_shards" }
 ,
 "_all" :
 Unknown macro: \{ "primaries" }
 ,
 "total" : \{ }
 },
 "indices" : \{ }
 }
{quote}
 # That means that {{totalCount}} value cannot be parsed from the json and is 
thus set to {{0}}.
 # Which means that {{estimatedByteSize}} value [is set to 
1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
 (Which itself is a workaround for similar issue.)
 # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
[BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
 which does not check the value and performs division of two {{long}} values, 
which of course results in {{0}} for any {{targetParallelism > 1}}.
 # Then 
[ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
 is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
sets {{nbBundlesFloat}} value to infinity.
 # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually 
impossible to use on direct runner.

h1. Resolution suggestion

I still haven't tested reading from ElasticsearchIO on proper runner (we use 
flink 1.10.2), so I cannot either confirm or deny its functionality on our 
elastic setup. At the moment I'm just suggesting few checks of input values so 
the zero division and unnecessary parallelism problems are eliminated on direct 
runner.

  was:
h1. Environment configuration

In my company we use [Elasticsearch cross 
cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
 setup for search. Cluster version is 7.9.1.

I intended to use ElasticsearchIO for reading application logs and subsequently 
producing some aggregated data.
h1. Problem description
 # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so 
it is not possible to compute 
[ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
 properly.
 # {{statsJson}} returned by the cluster looks like this:
{quote}Unknown macro: \{ "_shards" }
 ,
 "_all" :
 Unknown macro: \{ "primaries" }
 ,
 "total" : \{ }
 },
 "indices" : \{ }
 }
{quote}
 # That means that {{totalCount}} value cannot be parsed from the json and is 
thus set to {{0}}.
 # Which means that {{estimatedByteSize}} value [is set to 
1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
 (Which itself is a workaround for similar issue.)
 # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
[BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
 which does not check the value and performs division of two {{long}} values, 
which of course results in {{0}} for any {{targetParallelism > 1}}.
 # Then 
[ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
 is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
sets {{nbBundlesFloat}} value to infinity.
 # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually 
impossible to use on direct runner.

h1. Resolution suggestion

I still haven't tested reading from ElasticsearchIO on proper runner (we use 
flink 1.10.2), so I cannot either confirm or deny its functionality on our 
elastic setup. At the moment I|m just suggesting few checks of input values so 
the zero division and unnecessary parallelism problems are eliminated on direct 
runner.


> ElasticsearchIO performs 0 division on DirectRunner
> ---------------------------------------------------
>
>                 Key: BEAM-10945
>                 URL: https://issues.apache.org/jira/browse/BEAM-10945
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch, runner-direct
>    Affects Versions: 2.23.0
>         Environment: Beam 2.23
> Java 1.8.0_265
> Ubuntu 16.04
> Elastic version of cluster 7.9.1, cross cluster setup
> Parallelism of direct runner 8
>            Reporter: Milan Nikl
>            Priority: P2
>
> h1. Environment configuration
> In my company we use [Elasticsearch cross 
> cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis]
>  setup for search. Cluster version is 7.9.1.
> I intended to use ElasticsearchIO for reading application logs and 
> subsequently producing some aggregated data.
> h1. Problem description
>  # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, 
> so it is not possible to compute 
> [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692]
>  properly.
>  # {{statsJson}} returned by the cluster looks like this:
> {quote}Unknown macro: \{ "_shards" }
>  ,
>  "_all" :
>  Unknown macro: \{ "primaries" }
>  ,
>  "total" : \{ }
>  },
>  "indices" : \{ }
>  }
> {quote}
>  # That means that {{totalCount}} value cannot be parsed from the json and is 
> thus set to {{0}}.
>  # Which means that {{estimatedByteSize}} value [is set to 
> 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707]
>  (Which itself is a workaround for similar issue.)
>  # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in 
> [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212]
>  which does not check the value and performs division of two {{long}} values, 
> which of course results in {{0}} for any {{targetParallelism > 1}}.
>  # Then 
> [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665]
>  is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which 
> sets {{nbBundlesFloat}} value to infinity.
>  # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 
> BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually 
> impossible to use on direct runner.
> h1. Resolution suggestion
> I still haven't tested reading from ElasticsearchIO on proper runner (we use 
> flink 1.10.2), so I cannot either confirm or deny its functionality on our 
> elastic setup. At the moment I'm just suggesting few checks of input values 
> so the zero division and unnecessary parallelism problems are eliminated on 
> direct runner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to