Ahh!! I think it wasn't as crisp, JoshJ.
What I mean to say that Sort API enforces natural ordering which was
insufficient in this case. So I took different ways to solve the
problem. Both the solutions gave expected results.
In the first solution, I grouped the results a couple of times and the
sorted them in the ParallelDo method of a PCollection using a
comparator. I wasn't able to use any of the Sorting features available
in Crunch.
In the second solution I think sorting of keys gets invokedJ (I am
still not sure, but if I look code then it looks like this). But for
doing this I have to implement a bunch of things.
The NameCompararable class used here implemented WritableComparable,
which provided compareTo method for sorting. It wasn't binary comparison
as in Text class.
The second solution looked more appealing to me than the first one,
where I shuffled data a couple of times. But I had to implement a bunch
of things to make it work all of which is a copy paste from Crunch
existing code. I was thinking if I could pass a /Comparator/ in the
Crunch Sort API that can solve the problemJ that could be a better way
of solving this issue.
Also from a performance standpoint, what do you think which one would
perform better ?
Actually I am from data quality business and we have applications that
process people and company identities. I was thinking of using TupleN
and sorting them in different manners. Then I implemented a simple
use-case over it. Attached is my test case having both solutions
J
On 18-07-2012 18:21, Josh Wills wrote:
Hey Rahul,
I don't quite follow-- why did the Sort API work in the second case, but
not the first? org.apache.hadoop.io.Text, the underlying Writable type for
Strings, is also WritableComparable. Did NamesComparable do something that
Text does not?
J
On Wed, Jul 18, 2012 at 3:28 AM, Rahul <[email protected]> wrote:
I am trying to sort some data. The data had names and I was try to sort
in the following manner.
*ORIGINAL DATA* * SORTED DATA*
/Rahul shekhar/
/rahul Sameer/
/RAHUL ===== rahul/
/shekar ===== Rahul/
/hans RAHul/
/kasper kasper/
/Sameer hans/
/
/
This was a bit customized Sorting where I wanted to first sort them in
lexicographic manner and then maybe take capitalization also into
consideration.
Initially I was trying with the Sort API but was unsuccessful with that.
But then I tried in a couple of ways as explained below :
In the first solution, I outputted each of the names them against their
starting character in a /Ptable/. Then collected all the values for a
particular key.
After that I selected all the values and then used a /Comparator /to sort
data in each of the collection.
/PTable<String, String> classifiedData = count.parallelDo( new
NamesClassification(),**Writables.tableOf(Writables.**
strings(),Writables.strings())**);
PTable<String, Collection<String> collectedValues =
classifiedData.collectValues()**;
PCollection<Collection<String> names = collectedValues.values();
PCollection<Collection<String>**> sortedNames = names.parallelDo("names
Sorting",new NamesSorting(), Writables.collections(**
Writables.strings()));/
Not completely convinced with the path I took. I spend some time of
solving it and found another way of doing same.
In the second solution, I created my own writable type that implemented
WritableComparable. Also implemented all the mapping functions for the
same, so that it can be used with crunch WritableTypes.
/class NamesComparable implements WritableComparable<**NamesComparable>{
......}
MapFn<String,//**NamesComparable//> string_to_names =.........
MapFn<//NamesComparable,**String//> names_to_string =........./
/
/
Then I used this while converting the read data into it and then sorting
it.
PCollection<String> readLines = pipeline.readTextFile(fileLoc)**;
PCollection<String> lines = readLines.parallelDo(new DoFn<String,
String>() {
@Override
public void process(String input, Emitter<String> emitter) {
emitter.emit(input);}},
*stringToNames*());
PCollection<String> sortedData = Sort.sort(lines, Order.DESCENDING);
I found of these methods as quite tricky that give a feeling of going
around a bush. Is there a better way of accomplishing the same ? Have I
missed some aspects ?
If not, then I believe there is scope of having an Sorting API that can
have support of some customizations.
regards
Rahul
package com.mylearning.crunch;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Sort;
import org.apache.crunch.lib.Sort.Order;
import org.apache.crunch.test.FileHelper;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.Writables;
import org.junit.Test;
public class SortNamesTest implements Serializable {
private static final MapFn<NamesComparable, String> TEXT_TO_STRING = new
MapFn<NamesComparable, String>() {
@Override
public String map(NamesComparable input) {
return input.toString();
}
};
private static final MapFn<String, NamesComparable> STRING_TO_TEXT = new
MapFn<String, NamesComparable>() {
@Override
public NamesComparable map(String input) {
return new NamesComparable(input);
}
};
public static final WritableType<String, NamesComparable> stringToNames() {
return new WritableType<String, NamesComparable>(String.class,
NamesComparable.class,
TEXT_TO_STRING, STRING_TO_TEXT);
}
@Test
public void shouldSortNamesv2() throws Exception {
String fileLoc = FileHelper.createTempCopyOf("person-names.txt");
Pipeline pipeline = new MRPipeline(SortNamesTest.class);
PCollection<String> readLines = pipeline.readTextFile(fileLoc);
PCollection<String> lines = readLines.parallelDo(new DoFn<String, String>()
{
@Override
public void process(String input, Emitter<String> emitter) {
emitter.emit(input);
}
}, stringToNames());
PCollection<String> sortedData = Sort.sort(lines, Order.DESCENDING);
Iterator<String> materializeItr = sortedData.materialize().iterator();
while (materializeItr.hasNext()) {
System.out.println(materializeItr.next());
}
}
@Test
public void shouldSortNamesv1() throws Exception {
String fileLoc = FileHelper.createTempCopyOf("person-names.txt");
Pipeline pipeline = new MRPipeline(SortNamesTest.class);
PCollection<String> readLines = pipeline.readTextFile(fileLoc);
PTable<String, String> data = readLines.parallelDo(new DoFn<String,
Pair<String, String>>() {
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
emitter.emit(new Pair<String, String>("" +
input.toLowerCase().charAt(0), input));
}
}, Writables.tableOf(Writables.strings(), Writables.strings()));
PTable<String, Collection<String>> collectValues = data.collectValues();
PCollection<Collection<String>> values = collectValues.values();
PCollection<Collection<String>> sortedNames = values.parallelDo(
new DoFn<Collection<String>, Collection<String>>() {
@Override
public void process(Collection<String> input,
Emitter<Collection<String>> emitter) {
ArrayList<String> list = new ArrayList<String>(input);
Collections.sort(list, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareToIgnoreCase(o2);
}
});
emitter.emit(input);
}
}, Writables.collections(Writables.strings()));
Iterator<Collection<String>> materializedColl =
sortedNames.materialize().iterator();
int fileCount = 1;
while (materializedColl.hasNext()) {
Collection<String> next = materializedColl.next();
FileUtils.writeLines(new File("/home/rahul/crunchOut", "File-" +
fileCount + ".txt"),
"utf-8", next);
fileCount++;
}
}
}