[ 
https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=744280&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-744280
 ]

ASF GitHub Bot logged work on BEAM-14064:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Mar/22 17:56
            Start Date: 18/Mar/22 17:56
    Worklog Time Spent: 10m 
      Work Description: egalpin commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r830232322



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new 
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new 
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new 
GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver 
out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, 
doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, 
doc.getTimestamp());

Review comment:
       You might be completely correct, I definitely don't feel I have a solid 
grasp just yet, but I'll give my interpretation of the root cause of the error 
that prompted this change and my understanding of how this PR might solve it.  
I'm also operating under the assumption that the watermark is not advanced 
between elements of a single bundle, but instead advanced between bundles.
   
   My understanding of the root cause is that it stems from the situation where 
there are elements in a bundle which belong to different windows.  This can 
even happen when using GIB because a bundle could have 2 elements where each 
element is a batch output from GIB.  Entities from each element in the bundle 
are then buffered.  If there were fewer entities in the bundle element than the 
max allowable number that can be buffered, the `@ProcessElement` completes with 
no output.  Then, another element from the bundle begins processing in 
`@ProcessElement`, this time with an element belonging to a window "ahead" in 
time than the window of the prior element.  Now any buffered entities cannot be 
output using `outputWithTimestamp` because the buffered entities have 
timestamps which are invalid for the window to which the element currently 
being processed by `@ProcessElement` belongs i.e. in the original Errors, the 
timestamps are not invalid for the pipeline with respect to the watermark, but 
are invalid when a different data from one window is output using another 
arbitrary window.
   
   (this is my understanding, not necessarily fact/reality).
   
   This PR might fix that issue by removing the error of outputting with 
invalid timestamps.  _If_ the watermark is held until an entire bundle is 
processed, then the existing error is not arising because the timestamps are 
invalid compared to the watermark, but strictly due to the window context from 
which they are attempting to be output.  By rewindowing into the GlobalWindow 
before all other processing, the Window context should always be the 
GlobalWindow for all elements in a bundle.  Then when we output with timestamp, 
the timestamps should no longer fail validation because they would be within 
the allowable bounds of [watermark, GlobalWindow.MAX_TIMESTAMP).  Then we 
re-window back into the original/input windowing strategy before putting 
dealing with everything in the global window.
   
   Thoughts? Please correct above assumptions where I have things wrong.  CC 
@lukecwik as they deserve credit for the idea of reifying timestamps and global 
windowing -> rewindowing as a potential solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 744280)
    Time Spent: 3h 50m  (was: 3h 40m)

> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
>                 Key: BEAM-14064
>                 URL: https://issues.apache.org/jira/browse/BEAM-14064
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Luke Cwik
>            Assignee: Evan Galpin
>            Priority: P1
>             Fix For: 2.38.0
>
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then 
> trying to output them in window Y when flushing the batch. This exposed a bug 
> where elements that were being buffered were being output as part of a 
> different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when 
> the pipeline is processing elements in window X that output with a timestamp 
> is valid for window X. Note that this validation only occurs in 
> *@ProcessElement* since output is associated with the current window with the 
> input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing 
> context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain 
> it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a 
> streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
>             PubsubIO.readStrings().fromSubscription(subscription)
>         ).apply(ParseJsons.of(OurObject::class.java))
>             .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
>             .withUseStatefulBatches(true)
>             .withMaxParallelRequestsPerWindow(1)
>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>             // 5 bytes **> KiB **> MiB, so 5 MiB
>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>             // # of docs
>             .withMaxBatchSize(1000)
>             .withConnectionConfiguration(
>                 ElasticsearchIO.ConnectionConfiguration.create(
>                     arrayOf(host),
>                     "fubar",
>                     "_doc"
>                 ).withConnectTimeout(5000)
>                     .withSocketTimeout(30000)
>             )
>             .withRetryConfiguration(
>                 ElasticsearchIO.RetryConfiguration.create(
>                     10,
>                     // the duration is wall clock, against the connection and 
> socket timeouts specified
>                     // above. I.e., 10 x 30s is gonna be more than 3 minutes, 
> so if we're getting
>                     // 10 socket timeouts in a row, this would ignore the 
> "10" part and terminate
>                     // after 6. The idea is that in a mixed failure mode, 
> you'd get different timeouts
>                     // of different durations, and on average 10 x fails < 4m.
>                     // That said, 4m is arbitrary, so adjust as and when 
> needed.
>                     Duration.standardMinutes(4)
>                 )
>             )
>             .withIdFn { f: JsonNode -> f["id"].asText() }
>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>             .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") == 
> "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the 
> consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker: 
> java.lang.IllegalArgumentException: Cannot output with timestamp 
> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the 
> timestamp of the 
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
> DoFn#getAllowedTimestampSkew() Javadoc 
> for details on changing the allowed skew. 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>  
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>  
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>  
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks, 
> and it seems like the code in the trace is largely added by the PR linked 
> above. The error usually claims a skew of a few seconds, but obviously I 
> can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and 
> it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending 
> to fix, and additionally, I’d also be happy if someone else can reproduce 
> this or knows of similar reports. I feel like what we’re doing is not that 
> uncommon a scenario, so I would have thought someone else would have hit this 
> by now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to