Ahh!! I think it wasn't as crisp, JoshJ.

What I mean to say that Sort API enforces natural ordering which was insufficient in this case. So I took different ways to solve the problem. Both the solutions gave expected results.

In the first solution, I grouped the results a couple of times and the sorted them in the ParallelDo method of a PCollection using a comparator. I wasn't able to use any of the Sorting features available in Crunch.

In the second solution I think sorting of keys gets invokedJ (I am still not sure, but if I look code then it looks like this). But for doing this I have to implement a bunch of things. The NameCompararable class used here implemented WritableComparable, which provided compareTo method for sorting. It wasn't binary comparison as in Text class.

The second solution looked more appealing to me than the first one, where I shuffled data a couple of times. But I had to implement a bunch of things to make it work all of which is a copy paste from Crunch existing code. I was thinking if I could pass a /Comparator/ in the Crunch Sort API that can solve the problemJ that could be a better way of solving this issue.

Also from a performance standpoint, what do you think which one would perform better ?

Actually I am from data quality business and we have applications that process people and company identities. I was thinking of using TupleN and sorting them in different manners. Then I implemented a simple use-case over it. Attached is my test case having both solutions

J
On 18-07-2012 18:21, Josh Wills wrote:
Hey Rahul,

I don't quite follow-- why did the Sort API work in the second case, but
not the first? org.apache.hadoop.io.Text, the underlying Writable type for
Strings, is also WritableComparable. Did NamesComparable do something that
Text does not?

J

On Wed, Jul 18, 2012 at 3:28 AM, Rahul <[email protected]> wrote:

I am trying to  sort some data. The data had names and I was try to sort
in the following manner.

*ORIGINAL DATA* *  SORTED DATA*
/Rahul                                               shekhar/
/rahul                                                Sameer/
/RAHUL              =====                     rahul/
/shekar               =====                     Rahul/
/hans                                                 RAHul/
/kasper                                              kasper/
/Sameer                                             hans/
/
/
This was a bit customized Sorting where I wanted to first sort them in
lexicographic manner and then maybe take capitalization also into
consideration.
Initially I was trying with the Sort API but was unsuccessful with that.
But then I tried in a couple of ways as explained below :

In the first solution, I outputted each of the names them against their
starting character in a /Ptable/. Then collected all the values for a
particular key.
After that I selected all the values and then used a /Comparator /to sort
data in each of the collection.

  /PTable<String, String> classifiedData = count.parallelDo( new
NamesClassification(),**Writables.tableOf(Writables.**
strings(),Writables.strings())**);
  PTable<String, Collection<String> collectedValues =
classifiedData.collectValues()**;
  PCollection<Collection<String> names = collectedValues.values();
  PCollection<Collection<String>**> sortedNames = names.parallelDo("names
Sorting",new NamesSorting(), Writables.collections(**
Writables.strings()));/


Not completely convinced with the path I took. I spend some time of
solving it and found another way of doing same.
In the second solution, I created my own writable type that implemented
WritableComparable. Also implemented all the mapping functions for the
same, so that it can be used with crunch WritableTypes.

/class NamesComparable implements WritableComparable<**NamesComparable>{
......}

MapFn<String,//**NamesComparable//> string_to_names =.........
MapFn<//NamesComparable,**String//> names_to_string =........./

/
/
Then  I used this while converting the read data into it and then sorting
it.

     PCollection<String> readLines = pipeline.readTextFile(fileLoc)**;
     PCollection<String> lines = readLines.parallelDo(new DoFn<String,
String>() {
       @Override
      public void process(String input, Emitter<String> emitter) {
emitter.emit(input);}},
      *stringToNames*());

     PCollection<String> sortedData = Sort.sort(lines, Order.DESCENDING);


I found of these methods as quite tricky that give a feeling of going
around a bush. Is there a better way of accomplishing the same ? Have I
missed some aspects ?
If not, then  I believe there is scope of having an Sorting API that can
have support of some customizations.

regards
Rahul




package com.mylearning.crunch;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Sort;
import org.apache.crunch.lib.Sort.Order;
import org.apache.crunch.test.FileHelper;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.Writables;
import org.junit.Test;

public class SortNamesTest implements Serializable {

  private static final MapFn<NamesComparable, String> TEXT_TO_STRING = new 
MapFn<NamesComparable, String>() {
    @Override
    public String map(NamesComparable input) {
      return input.toString();
    }
  };

  private static final MapFn<String, NamesComparable> STRING_TO_TEXT = new 
MapFn<String, NamesComparable>() {
    @Override
    public NamesComparable map(String input) {
      return new NamesComparable(input);
    }
  };

  public static final WritableType<String, NamesComparable> stringToNames() {
    return new WritableType<String, NamesComparable>(String.class, 
NamesComparable.class,
        TEXT_TO_STRING, STRING_TO_TEXT);
  }

  @Test
  public void shouldSortNamesv2() throws Exception {
    String fileLoc = FileHelper.createTempCopyOf("person-names.txt");
    Pipeline pipeline = new MRPipeline(SortNamesTest.class);
    PCollection<String> readLines = pipeline.readTextFile(fileLoc);
    PCollection<String> lines = readLines.parallelDo(new DoFn<String, String>() 
{
      @Override
      public void process(String input, Emitter<String> emitter) {
        emitter.emit(input);
      }
    }, stringToNames());
    PCollection<String> sortedData = Sort.sort(lines, Order.DESCENDING);

    Iterator<String> materializeItr = sortedData.materialize().iterator();
    while (materializeItr.hasNext()) {
      System.out.println(materializeItr.next());
    }
  }

  @Test
  public void shouldSortNamesv1() throws Exception {
    String fileLoc = FileHelper.createTempCopyOf("person-names.txt");
    Pipeline pipeline = new MRPipeline(SortNamesTest.class);
    PCollection<String> readLines = pipeline.readTextFile(fileLoc);

    PTable<String, String> data = readLines.parallelDo(new DoFn<String, 
Pair<String, String>>() {

      @Override
      public void process(String input, Emitter<Pair<String, String>> emitter) {
        emitter.emit(new Pair<String, String>("" + 
input.toLowerCase().charAt(0), input));
      }
    }, Writables.tableOf(Writables.strings(), Writables.strings()));
    PTable<String, Collection<String>> collectValues = data.collectValues();
    PCollection<Collection<String>> values = collectValues.values();
    PCollection<Collection<String>> sortedNames = values.parallelDo(
        new DoFn<Collection<String>, Collection<String>>() {

          @Override
          public void process(Collection<String> input, 
Emitter<Collection<String>> emitter) {
            ArrayList<String> list = new ArrayList<String>(input);
            Collections.sort(list, new Comparator<String>() {

              @Override
              public int compare(String o1, String o2) {
                return o1.compareToIgnoreCase(o2);
              }
            });
            emitter.emit(input);

          }
        }, Writables.collections(Writables.strings()));

    Iterator<Collection<String>> materializedColl = 
sortedNames.materialize().iterator();
    int fileCount = 1;
    while (materializedColl.hasNext()) {
      Collection<String> next = materializedColl.next();
      FileUtils.writeLines(new File("/home/rahul/crunchOut", "File-" + 
fileCount + ".txt"),
          "utf-8", next);
      fileCount++;
    }
  }
}

Reply via email to