[
https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=742154&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-742154
]
ASF GitHub Bot logged work on BEAM-14064:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Mar/22 09:37
Start Date: 16/Mar/22 09:37
Worklog Time Spent: 10m
Work Description: je-ik commented on a change in pull request #17097:
URL: https://github.com/apache/beam/pull/17097#discussion_r827786188
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -221,6 +221,31 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
duration));
}
+ public GroupIntoBatches<K, InputT> withByteSize(Long batchSizeBytes) {
Review comment:
This probably deserves a javadoc comment. Why do we use big `Long`
instead of `long`?
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2335,126 +2270,40 @@ public void setup() throws IOException {
}
}
- @StartBundle
- public void startBundle(StartBundleContext context) {
- batch = ArrayListMultimap.create();
- currentBatchSizeBytes = 0;
- }
-
- /**
- * Adapter interface which provides a common parent for {@link
ProcessContext} and {@link
- * FinishBundleContext} so that we are able to use a single common
invocation to output from.
- */
- interface ContextAdapter {
- void output(
- TupleTag<Document> tag, Document document, Instant timestamp,
BoundedWindow window);
- }
-
- private static final class ProcessContextAdapter<T> implements
ContextAdapter {
- private final DoFn<T, Document>.ProcessContext context;
-
- private ProcessContextAdapter(DoFn<T, Document>.ProcessContext
context) {
- this.context = context;
- }
-
- @Override
- public void output(
- TupleTag<Document> tag, Document document, Instant ignored1,
BoundedWindow ignored2) {
- // Note: window and timestamp are intentionally unused, but required
as params to fit the
- // interface
- context.output(tag, document);
- }
- }
-
- private static final class FinishBundleContextAdapter<T> implements
ContextAdapter {
- private final DoFn<T, Document>.FinishBundleContext context;
-
- private FinishBundleContextAdapter(DoFn<T,
Document>.FinishBundleContext context) {
- this.context = context;
- }
-
- @Override
- public void output(
- TupleTag<Document> tag, Document document, Instant timestamp,
BoundedWindow window) {
- context.output(tag, document, timestamp, window);
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException,
InterruptedException {
Review comment:
Can we avoid `ProcessContext` and use `@Element` and `OutputCollector`
for new DoFns? Might be worth changing for the old ones as well, but at least
for the new code, we might want to use the new style.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -221,6 +221,31 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
duration));
}
+ public GroupIntoBatches<K, InputT> withByteSize(Long batchSizeBytes) {
+ checkArgument(
+ batchSizeBytes != null && batchSizeBytes < Long.MAX_VALUE &&
batchSizeBytes > 0,
+ "batchSizeBytes should be a non-negative value less than " +
Long.MAX_VALUE);
+ return new GroupIntoBatches<>(
+ BatchingParams.create(
+ params.getBatchSize(),
+ batchSizeBytes,
+ params.getElementByteSize(),
+ params.getMaxBufferingDuration()));
+ }
+
+ public GroupIntoBatches<K, InputT> withByteSize(
Review comment:
Please add javadoc, same for `Long` vs `long` as above.
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -336,11 +333,11 @@ void testWriteWithErrors() throws Exception {
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxBatchSize(BATCH_SIZE);
- List<String> input =
- ElasticsearchIOTestUtils.createDocuments(
- numDocs,
ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+ // List<String> input =
+ // ElasticsearchIOTestUtils.createDocuments(
+ // numDocs,
ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
Review comment:
Is this some left-over?
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -362,11 +359,17 @@ public boolean matches(Object o) {
// write bundles size is the runner decision, we cannot force a bundle
size,
// so we test the Writer as a DoFn outside of a runner.
- try (DoFnTester<Document, Document> fnTester =
- DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
- // inserts into Elasticsearch
- fnTester.processBundle(serializeDocs(write, input));
- }
+ // try (DoFnTester<KV<Integer, Iterable<Document>>, Document> fnTester =
+ // DoFnTester.of(new BulkIOFn(write.getBulkIO()))) {
+ // // inserts into Elasticsearch
+ // fnTester.processBundle(serializeDocs(write, input));
+ // }
Review comment:
Can we remove the old code?
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2335,126 +2270,40 @@ public void setup() throws IOException {
}
}
- @StartBundle
- public void startBundle(StartBundleContext context) {
- batch = ArrayListMultimap.create();
- currentBatchSizeBytes = 0;
- }
-
- /**
- * Adapter interface which provides a common parent for {@link
ProcessContext} and {@link
- * FinishBundleContext} so that we are able to use a single common
invocation to output from.
- */
- interface ContextAdapter {
- void output(
- TupleTag<Document> tag, Document document, Instant timestamp,
BoundedWindow window);
- }
-
- private static final class ProcessContextAdapter<T> implements
ContextAdapter {
- private final DoFn<T, Document>.ProcessContext context;
-
- private ProcessContextAdapter(DoFn<T, Document>.ProcessContext
context) {
- this.context = context;
- }
-
- @Override
- public void output(
- TupleTag<Document> tag, Document document, Instant ignored1,
BoundedWindow ignored2) {
- // Note: window and timestamp are intentionally unused, but required
as params to fit the
- // interface
- context.output(tag, document);
- }
- }
-
- private static final class FinishBundleContextAdapter<T> implements
ContextAdapter {
- private final DoFn<T, Document>.FinishBundleContext context;
-
- private FinishBundleContextAdapter(DoFn<T,
Document>.FinishBundleContext context) {
- this.context = context;
- }
-
- @Override
- public void output(
- TupleTag<Document> tag, Document document, Instant timestamp,
BoundedWindow window) {
- context.output(tag, document, timestamp, window);
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException,
InterruptedException {
+ if (c.element() == null || c.element().getValue() == null) {
+ return;
}
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context)
- throws IOException, InterruptedException {
- flushAndOutputResults(new FinishBundleContextAdapter<>(context));
- }
-
- private void flushAndOutputResults(ContextAdapter context)
- throws IOException, InterruptedException {
- // TODO: remove ContextAdapter and Multimap in favour of
MultiOutputReceiver when
- // https://issues.apache.org/jira/browse/BEAM-1287 is completed
- Multimap<BoundedWindow, Document> results = flushBatch();
- for (Entry<BoundedWindow, Document> result : results.entries()) {
- BoundedWindow outputWindow = result.getKey();
- Document outputResult = result.getValue();
- Instant timestamp = outputResult.getTimestamp();
- if (timestamp == null) {
- timestamp = outputWindow.maxTimestamp();
- }
- if (outputResult.getHasError()) {
- context.output(Write.FAILED_WRITES, outputResult, timestamp,
outputWindow);
+ for (Document doc :
flushBatch(Lists.newArrayList(c.element().getValue()))) {
+ if (doc.getHasError()) {
+ c.output(Write.FAILED_WRITES, doc);
} else {
- context.output(Write.SUCCESSFUL_WRITES, outputResult, timestamp,
outputWindow);
+ c.output(Write.SUCCESSFUL_WRITES, doc);
}
}
}
- protected void addAndMaybeFlush(
- Document bulkApiEntity, ProcessContext context, BoundedWindow
outputWindow)
- throws IOException, InterruptedException {
-
- batch.put(outputWindow, bulkApiEntity);
- currentBatchSizeBytes +=
-
bulkApiEntity.getBulkDirective().getBytes(StandardCharsets.UTF_8).length;
-
- if (batch.size() >= spec.getMaxBatchSize()
- || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
- flushAndOutputResults(new ProcessContextAdapter<>(context));
- }
- }
-
- private boolean isRetryableClientException(Throwable t) {
- // RestClient#performRequest only throws wrapped IOException so we
must inspect the
- // exception cause to determine if the exception is likely transient
i.e. retryable or
- // not.
- return t.getCause() instanceof ConnectTimeoutException
- || t.getCause() instanceof SocketTimeoutException
- || t.getCause() instanceof ConnectionClosedException
- || t.getCause() instanceof ConnectException;
- }
-
- private Multimap<BoundedWindow, Document> flushBatch()
+ private Iterable<Document> flushBatch(List<Document> docs)
throws IOException, InterruptedException {
- if (batch.isEmpty()) {
- return ArrayListMultimap.create();
+ if (docs.isEmpty()) {
+ return Collections.emptyList();
}
- LOG.info(
- "ElasticsearchIO batch size: {}, batch size bytes: {}",
- batch.size(),
- currentBatchSizeBytes);
+ // LOG.info(
+ // "ElasticsearchIO batch size: {}, batch size bytes: {}",
+ // batch.size(),
+ // currentBatchSizeBytes);
Review comment:
Can we remove this code? Or do we want to keep it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 742154)
Time Spent: 0.5h (was: 20m)
> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
> Key: BEAM-14064
> URL: https://issues.apache.org/jira/browse/BEAM-14064
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Luke Cwik
> Assignee: Evan Galpin
> Priority: P2
> Fix For: 2.38.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then
> trying to output them in window Y when flushing the batch. This exposed a bug
> where elements that were being buffered were being output as part of a
> different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when
> the pipeline is processing elements in window X that output with a timestamp
> is valid for window X. Note that this validation only occurs in
> *@ProcessElement* since output is associated with the current window with the
> input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing
> context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain
> it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a
> streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
> PubsubIO.readStrings().fromSubscription(subscription)
> ).apply(ParseJsons.of(OurObject::class.java))
> .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
> .withUseStatefulBatches(true)
> .withMaxParallelRequestsPerWindow(1)
> .withMaxBufferingDuration(Duration.standardSeconds(30))
> // 5 bytes **> KiB **> MiB, so 5 MiB
> .withMaxBatchSizeBytes(5L * 1024 * 1024)
> // # of docs
> .withMaxBatchSize(1000)
> .withConnectionConfiguration(
> ElasticsearchIO.ConnectionConfiguration.create(
> arrayOf(host),
> "fubar",
> "_doc"
> ).withConnectTimeout(5000)
> .withSocketTimeout(30000)
> )
> .withRetryConfiguration(
> ElasticsearchIO.RetryConfiguration.create(
> 10,
> // the duration is wall clock, against the connection and
> socket timeouts specified
> // above. I.e., 10 x 30s is gonna be more than 3 minutes,
> so if we're getting
> // 10 socket timeouts in a row, this would ignore the
> "10" part and terminate
> // after 6. The idea is that in a mixed failure mode,
> you'd get different timeouts
> // of different durations, and on average 10 x fails < 4m.
> // That said, 4m is arbitrary, so adjust as and when
> needed.
> Duration.standardMinutes(4)
> )
> )
> .withIdFn { f: JsonNode -> f["id"].asText() }
> .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
> .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") ==
> "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the
> consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker:
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the
> timestamp of the
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0
> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the
> DoFn#getAllowedTimestampSkew() Javadoc
> for details on changing the allowed skew.
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks,
> and it seems like the code in the trace is largely added by the PR linked
> above. The error usually claims a skew of a few seconds, but obviously I
> can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and
> it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending
> to fix, and additionally, I’d also be happy if someone else can reproduce
> this or knows of similar reports. I feel like what we’re doing is not that
> uncommon a scenario, so I would have thought someone else would have hit this
> by now.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)