[
https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267875#comment-16267875
]
ASF GitHub Bot commented on BEAM-3247:
--------------------------------------
jkff commented on a change in pull request #4175: [BEAM-3247] fix Sample.any
performance
URL: https://github.com/apache/beam/pull/4175#discussion_r153365677
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
##########
@@ -209,29 +201,49 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
/**
- * A {@link DoFn} that returns up to limit elements from the side input
PCollection.
+ * A {@link CombineFn} that combines into a {@link List} of up to limit
elements.
*/
- private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
- long limit;
- final PCollectionView<Iterable<T>> iterableView;
+ private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>,
Iterable<T>> {
+ private final long limit;
- public SampleAnyDoFn(long limit, PCollectionView<Iterable<T>>
iterableView) {
+ private SampleAnyCombineFn(long limit) {
this.limit = limit;
- this.iterableView = iterableView;
}
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (T i : c.sideInput(iterableView)) {
- if (limit-- <= 0) {
- break;
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ if (accumulator.size() < limit) {
+ accumulator.add(input);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> merged = new ArrayList<>();
+ for (List<T> accumulator : accumulators) {
+ for (T t : accumulator) {
+ merged.add(t);
+ if (merged.size() >= limit) {
+ return merged;
+ }
}
- c.output(i);
}
+ return merged;
+ }
+
+ @Override
+ public Iterable<T> extractOutput(List<T> accumulator) {
+ return accumulator;
}
}
- /**
+ /**
Review comment:
Accidental change
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Sample.any memory constraint
> ----------------------------
>
> Key: BEAM-3247
> URL: https://issues.apache.org/jira/browse/BEAM-3247
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Affects Versions: 2.1.0
> Reporter: Neville Li
> Assignee: Neville Li
> Priority: Minor
>
> Right now {{Sample.any}} converts the collection to an iterable view and take
> first n in a side input. This may require materializing the entire collection
> to disk and is potentially inefficient.
> https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74
> It can be fixed by applying a truncating `DoFn` first, then a combine into
> `List<T>` which limits the list size, and finally flattening the list.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)