[
https://issues.apache.org/jira/browse/BEAM-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164085#comment-16164085
]
Luke Cwik commented on BEAM-2952:
---------------------------------
The KV.OrderByKey is only meant as a comparator and not used to create ordered
PCollections.
There is an example of usage here:
https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L255
It only works because it effectively globally combines over all the keys and
produces a single element with a fixed number of keys in in a specific order.
This element is limited to the memory of the machine.
You may want to look at
https://github.com/apache/beam/blob/v2.1.0/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
for sorting of PCollections. Note that the implementation currently assumes
that it can fit all the records into memory. Its useful for very small specific
cases but can not be generalized.
Finally, this feels like a question and would have been better to ask on the
Apace Beam user mailing list ([email protected]) then for this to be opened
as an issue. If you believe that your request is not a question, please re-open
and reword your request to represent a task that would enhance Apache Beam in
some way.
> How to use KV.OrderByKey
> ------------------------
>
> Key: BEAM-2952
> URL: https://issues.apache.org/jira/browse/BEAM-2952
> Project: Beam
> Issue Type: New Feature
> Components: examples-java
> Reporter: Rick Lin
> Assignee: Reuven Lax
>
> Hi all,
> I have a question how to use the beam java sdk: KV.OrderByKey
> My java code is as:
> int[] key=new int[] {2,1,3,4,5};
> double[] value=new double[] {1.0,1.0,1.0,1.0,1.0};
> List<KV<Integer, Double>> KVlist = new ArrayList<>();
> List<KV<Integer, Double>> KVtest = new ArrayList<>();
> int n=value.length;
> for (int i=0; i<n; i++){
> KVlist.add(KV.of(i, value[i]));
> System.out.println(KVlist.get(i));
> }
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> PCollection<KV<Integer, Double>> t1=p.apply("create data",Create.of(KVlist));
> p.run;
> Thanks
> Rick
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)