[
https://issues.apache.org/jira/browse/FLINK-138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske closed FLINK-138.
-------------------------------
Resolution: Duplicate
Duplicate of FLINK-146
> WordCount total ordering not working?
> -------------------------------------
>
> Key: FLINK-138
> URL: https://issues.apache.org/jira/browse/FLINK-138
> Project: Flink
> Issue Type: Bug
> Reporter: GitHub Import
> Labels: github-import
> Fix For: pre-apache
>
>
> Hi to everybody,
> I tried to slightly modify the WordCount example to obtain a single output
> file ordered by word occurrence but I always get an error.
> This is the plan of my WordCount:
> ```java
> @Override
> public Plan getPlan(String... args) {
> // parse job parameters
> int numSubTasks = (args.length > 0 ?
> Integer.parseInt(args[0]) : 1);
> String dataInput = (args.length > 1 ? args[1] : "");
> String output = (args.length > 2 ? args[2] : "");
> FileDataSource source = new FileDataSource(new
> TextInputFormat(), dataInput, "Input Lines");
> // comment out this line for UTF-8 inputs
> // source.setParameter(TextInputFormat.CHARSET_NAME, "ASCII");
> MapContract mapper = MapContract.builder(new TokenizeLine())
> .input(source)
> .name("Tokenize Lines")
> .build();
> ReduceContract reducer =
> ReduceContract.builder(CountWords.class, PactString.class, 0)
> .input(mapper)
> .name("Count Words")
> .build();
> reducer.setDegreeOfParallelism(1);
> FileDataSink out = new FileDataSink(new RecordOutputFormat(),
> output, reducer, "Word Counts");
> RecordOutputFormat.configureRecordFormat(out)
> .recordDelimiter('\n')
> .fieldDelimiter(' ')
> .field(PactString.class, 0)
> .field(PactInteger.class, 1);
> // sets the group sorting to the second field
> out.setLocalOrder(new Ordering(1, PactInteger.class,
> Order.DESCENDING));
> Plan plan = new Plan(out, "WordCount Example");
> plan.setDefaultParallelism(numSubTasks);
> return plan;
> }
> ```
> The error I receive is:
> java.lang.Exception: The data preparation for task 'Count Words' , caused an
> error: Invalid local strategy provided for CombineTask.
> at
> eu.stratosphere.pact.runtime.task.RegularPactTask.run(RegularPactTask.java:348)
> at
> eu.stratosphere.pact.runtime.task.RegularPactTask.invoke(RegularPactTask.java:292)
> at
> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:344)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: java.lang.RuntimeException: Invalid local strategy provided for
> CombineTask.
> at
> eu.stratosphere.pact.runtime.task.CombineDriver.prepare(CombineDriver.java:134)
> at
> eu.stratosphere.pact.runtime.task.RegularPactTask.run(RegularPactTask.java:343)
> Best,
> Flavio
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/138
> Created by: [fpompermaier|https://github.com/fpompermaier]
> Labels: bug, core,
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Wed Oct 09 17:39:36 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)