http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
new file mode 100644
index 0000000..1578fd0
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * This program filters lines from a CSV file with empty fields. In doing so, 
it counts the number of empty fields per
+ * column within a CSV file using a custom accumulator for vectors. In this 
context, empty fields are those, that at
+ * most contain whitespace characters like space and tab.
+ * <p>
+ * The input file is a plain text CSV file with the semicolon as field 
separator and double quotes as field delimiters
+ * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for 
configuration.
+ * <p>
+ * Usage: <code>FilterAndCountIncompleteLines [&lt;input file path&gt; 
[&lt;result path&gt;]]</code> <br>
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>custom accumulators
+ * <li>tuple data types
+ * <li>inline-defined functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class EmptyFieldsCountAccumulator {
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get the data set
+               final DataSet<Tuple> file = getDataSet(env);
+
+               // filter lines with empty fields
+               final DataSet<Tuple> filteredLines = file.filter(new 
EmptyFieldFilter());
+
+               // Here, we could do further processing with the filtered 
lines...
+               JobExecutionResult result;
+               // output the filtered lines
+               if (outputPath == null) {
+                       filteredLines.print();
+                       result = env.getLastJobExecutionResult();
+               } else {
+                       filteredLines.writeAsCsv(outputPath);
+                       // execute program
+                       result = env.execute("Accumulator example");
+               }
+
+               // get the accumulator result via its registration key
+               final List<Integer> emptyFields = 
result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
+               System.out.format("Number of detected empty fields per column: 
%s\n", emptyFields);
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static String filePath;
+       private static String outputPath;
+
+       private static boolean parseParameters(final String[] programArguments) 
{
+
+               if (programArguments.length >= 3) {
+                       System.err.println("Usage: 
FilterAndCountIncompleteLines [<input file path> [<result path>]]");
+                       return false;
+               }
+
+               if (programArguments.length >= 1) {
+                       filePath = programArguments[0];
+                       if (programArguments.length == 2) {
+                               outputPath = programArguments[1];
+                       }
+               }
+
+               return true;
+       }
+
+       @SuppressWarnings("unchecked")
+       private static DataSet<Tuple> getDataSet(final ExecutionEnvironment 
env) {
+
+               DataSet<? extends Tuple> source;
+               if (filePath == null) {
+                       source = env.fromCollection(getExampleInputTuples());
+
+               } else {
+                       source = env
+                                       .readCsvFile(filePath)
+                                       .fieldDelimiter(";")
+                                       .types(String.class, String.class, 
String.class);
+
+               }
+
+               return (DataSet<Tuple>) source;
+       }
+
+       private static Collection<Tuple3<String, String, String>> 
getExampleInputTuples() {
+               Collection<Tuple3<String, String, String>> inputTuples = new 
ArrayList<Tuple3<String, String, String>>();
+               inputTuples.add(new Tuple3<String, String, String>("John", 
"Doe", "Foo Str."));
+               inputTuples.add(new Tuple3<String, String, String>("Joe", 
"Johnson", ""));
+               inputTuples.add(new Tuple3<String, String, String>(null, "Kate 
Morn", "Bar Blvd."));
+               inputTuples.add(new Tuple3<String, String, String>("Tim", 
"Rinny", ""));
+               inputTuples.add(new Tuple3<String, String, String>("Alicia", 
"Jackson", "  "));
+               return inputTuples;
+       }
+
+       /**
+        * This function filters all incoming tuples that have one or more 
empty fields.
+        * In doing so, it also counts the number of empty fields per attribute 
with an accumulator (registered under 
+        * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
+        */
+       public static final class EmptyFieldFilter extends 
RichFilterFunction<Tuple> {
+
+               // create a new accumulator in each filter function instance
+               // accumulators can be merged later on
+               private final VectorAccumulator emptyFieldCounter = new 
VectorAccumulator();
+
+               @Override
+               public void open(final Configuration parameters) throws 
Exception {
+                       super.open(parameters);
+
+                       // register the accumulator instance
+                       
getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
+                                       this.emptyFieldCounter);
+               }
+
+               @Override
+               public boolean filter(final Tuple t) {
+                       boolean containsEmptyFields = false;
+
+                       // iterate over the tuple fields looking for empty ones
+                       for (int pos = 0; pos < t.getArity(); pos++) {
+
+                               final String field = t.getField(pos);
+                               if (field == null || field.trim().isEmpty()) {
+                                       containsEmptyFields = true;
+
+                                       // if an empty field is encountered, 
update the
+                                       // accumulator
+                                       this.emptyFieldCounter.add(pos);
+                               }
+                       }
+
+                       return !containsEmptyFields;
+               }
+       }
+
+       /**
+        * This accumulator maintains a vector of counts. Calling {@link 
#add(Integer)} increments the
+        * <i>n</i>-th vector component. The size of the vector is 
automatically managed.
+        */
+       public static class VectorAccumulator implements Accumulator<Integer, 
ArrayList<Integer>> {
+
+               /** Stores the accumulated vector components. */
+               private final ArrayList<Integer> resultVector;
+
+               public VectorAccumulator(){
+                       this(new ArrayList<Integer>());
+               }
+
+               public VectorAccumulator(ArrayList<Integer> resultVector){
+                       this.resultVector = resultVector;
+               }
+
+               /**
+                * Increases the result vector component at the specified 
position by 1.
+                */
+               @Override
+               public void add(Integer position) {
+                       updateResultVector(position, 1);
+               }
+
+               /**
+                * Increases the result vector component at the specified 
position by the specified delta.
+                */
+               private void updateResultVector(int position, int delta) {
+                       // inflate the vector to contain the given position
+                       while (this.resultVector.size() <= position) {
+                               this.resultVector.add(0);
+                       }
+
+                       // increment the component value
+                       final int component = this.resultVector.get(position);
+                       this.resultVector.set(position, component + delta);
+               }
+
+               @Override
+               public ArrayList<Integer> getLocalValue() {
+                       return this.resultVector;
+               }
+
+               @Override
+               public void resetLocal() {
+                       // clear the result vector if the accumulator instance 
shall be reused
+                       this.resultVector.clear();
+               }
+
+               @Override
+               public void merge(final Accumulator<Integer, 
ArrayList<Integer>> other) {
+                       // merge two vector accumulators by adding their up 
their vector components
+                       final List<Integer> otherVector = other.getLocalValue();
+                       for (int index = 0; index < otherVector.size(); 
index++) {
+                               updateResultVector(index, 
otherVector.get(index));
+                       }
+               }
+
+               @Override
+               public Accumulator<Integer, ArrayList<Integer>> clone() {
+                       return new VectorAccumulator(new 
ArrayList<Integer>(resultVector));
+               }
+
+               @Override
+               public String toString() {
+                       return StringUtils.join(resultVector, ',');
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
new file mode 100644
index 0000000..495ff26
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ * The original query can be found at
+ * <a 
href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf";>http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a>
 (page 45).
+ * 
+ * <p>
+ * This program implements the following SQL equivalent:
+ * 
+ * <p>
+ * <pre>{@code
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * }</pre>
+ *        
+ * <p>
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue.
+ * 
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
+ * as generated by the TPC-H data generator which is available at <a 
href="http://www.tpc.org/tpch/";>http://www.tpc.org/tpch/</a>.
+ * 
+ * <p>
+ * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; 
&lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
+ *  
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> tuple data types
+ * <li> inline-defined functions
+ * <li> projection and join projection
+ * <li> build-in aggregation functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class TPCHQuery10 {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+               
+               if(!parseParameters(args)) {
+                       return;
+               }
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get customer data set: (custkey, name, address, nationkey, 
acctbal) 
+               DataSet<Tuple5<Integer, String, String, Integer, Double>> 
customers = getCustomerDataSet(env);
+               // get orders data set: (orderkey, custkey, orderdate)
+               DataSet<Tuple3<Integer, Integer, String>> orders = 
getOrdersDataSet(env);
+               // get lineitem data set: (orderkey, extendedprice, discount, 
returnflag)
+               DataSet<Tuple4<Integer, Double, Double, String>> lineitems = 
getLineitemDataSet(env);
+               // get nation data set: (nationkey, name)
+               DataSet<Tuple2<Integer, String>> nations = 
getNationsDataSet(env);
+
+               // orders filtered by year: (orderkey, custkey)
+               DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+                               // filter by year
+                               orders.filter(
+                                                               new 
FilterFunction<Tuple3<Integer,Integer, String>>() {
+                                                                       
@Override
+                                                                       public 
boolean filter(Tuple3<Integer, Integer, String> o) {
+                                                                               
return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
+                                                                       }
+                                                               })
+                               // project fields out that are no longer 
required
+                               .project(0,1);
+
+               // lineitems filtered by flag: (orderkey, revenue)
+               DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag = 
+                               // filter by flag
+                               lineitems.filter(new 
FilterFunction<Tuple4<Integer, Double, Double, String>>() {
+                                                                               
@Override
+                                                                               
public boolean filter(Tuple4<Integer, Double, Double, String> l) {
+                                                                               
        return l.f3.equals("R");
+                                                                               
}
+                                                               })
+                               // compute revenue and project out return flag
+                               .map(new MapFunction<Tuple4<Integer, Double, 
Double, String>, Tuple2<Integer, Double>>() {
+                                                       @Override
+                                                       public Tuple2<Integer, 
Double> map(Tuple4<Integer, Double, Double, String> l) {
+                                                               // revenue per 
item = l_extendedprice * (1 - l_discount)
+                                                               return new 
Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
+                                                       }
+                                       });
+
+               // join orders with lineitems: (custkey, revenue)
+               DataSet<Tuple2<Integer, Double>> revenueByCustomer = 
+                               
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+                                                                       
.where(0).equalTo(0)
+                                                                       
.projectFirst(1).projectSecond(1);
+               
+               revenueByCustomer = 
revenueByCustomer.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+               // join customer with nation (custkey, name, address, 
nationname, acctbal)
+               DataSet<Tuple5<Integer, String, String, String, Double>> 
customerWithNation = customers
+                                               .joinWithTiny(nations)
+                                               .where(3).equalTo(0)
+                                               
.projectFirst(0,1,2).projectSecond(1).projectFirst(4);
+
+               // join customer (with nation) with revenue (custkey, name, 
address, nationname, acctbal, revenue)
+               DataSet<Tuple6<Integer, String, String, String, Double, 
Double>> result = 
+                               customerWithNation.join(revenueByCustomer)
+                               .where(0).equalTo(0)
+                               .projectFirst(0,1,2,3,4).projectSecond(1);
+
+               // emit result
+               result.writeAsCsv(outputPath, "\n", "|");
+               
+               // execute program
+               env.execute("TPCH Query 10 Example");
+               
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static String customerPath;
+       private static String ordersPath;
+       private static String lineitemPath;
+       private static String nationPath;
+       private static String outputPath;
+       
+       private static boolean parseParameters(String[] programArguments) {
+               
+               if(programArguments.length > 0) {
+                       if(programArguments.length == 5) {
+                               customerPath = programArguments[0];
+                               ordersPath = programArguments[1];
+                               lineitemPath = programArguments[2];
+                               nationPath = programArguments[3];
+                               outputPath = programArguments[4];
+                       } else {
+                               System.err.println("Usage: TPCHQuery10 
<customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> 
<result path>");
+                               return false;
+                       }
+               } else {
+                       System.err.println("This program expects data from the 
TPC-H benchmark as input data.\n" +
+                                                               "  Due to legal 
restrictions, we can not ship generated data.\n" +
+                                                               "  You can find 
the TPC-H data generator at http://www.tpc.org/tpch/.\n"; + 
+                                                               "  Usage: 
TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> 
<nation-csv path> <result path>");
+                       return false;
+               }
+               return true;
+       }
+       
+       private static DataSet<Tuple5<Integer, String, String, Integer, 
Double>> getCustomerDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(customerPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("11110100")
+                                       .types(Integer.class, String.class, 
String.class, Integer.class, Double.class);
+       }
+       
+       private static DataSet<Tuple3<Integer, Integer, String>> 
getOrdersDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(ordersPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("110010000")
+                                       .types(Integer.class, Integer.class, 
String.class);
+       }
+
+       private static DataSet<Tuple4<Integer, Double, Double, String>> 
getLineitemDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(lineitemPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("1000011010000000")
+                                       .types(Integer.class, Double.class, 
Double.class, String.class);
+       }
+       
+       private static DataSet<Tuple2<Integer, String>> 
getNationsDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(nationPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("1100")
+                                       .types(Integer.class, String.class);
+       }
+                       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
new file mode 100644
index 0000000..fd11c4d
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple 
class.
+ * The original query can be found at
+ * <a 
href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf";>http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a>
 (page 29).
+ *
+ * <p>
+ * This program implements the following SQL equivalent:
+ *
+ * <p>
+ * <pre>{@code
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }</pre>
+ *
+ * <p>
+ * Compared to the original TPC-H query this version does not sort the result 
by revenue
+ * and orderdate.
+ *
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
+ * as generated by the TPC-H data generator which is available at <a 
href="http://www.tpc.org/tpch/";>http://www.tpc.org/tpch/</a>.
+ *
+ *  <p>
+ * Usage: <code>TPCHQuery3 &lt;lineitem-csv path&gt; &lt;customer-csv path&gt; 
&lt;orders-csv path&gt; &lt;result path&gt;</code><br>
+ *  
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> custom data type derived from tuple data types
+ * <li> inline-defined functions
+ * <li> build-in aggregation functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class TPCHQuery3 {
+
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+               
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<Lineitem> lineitems = getLineitemDataSet(env);
+               DataSet<Order> orders = getOrdersDataSet(env);
+               DataSet<Customer> customers = getCustomerDataSet(env);
+               
+               // Filter market segment "AUTOMOBILE"
+               customers = customers.filter(
+                                                               new 
FilterFunction<Customer>() {
+                                                                       
@Override
+                                                                       public 
boolean filter(Customer c) {
+                                                                               
return c.getMktsegment().equals("AUTOMOBILE");
+                                                                       }
+                                                               });
+
+               // Filter all Orders with o_orderdate < 12.03.1995
+               orders = orders.filter(
+                                                       new 
FilterFunction<Order>() {
+                                                               private final 
DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+                                                               private final 
Date date = format.parse("1995-03-12");
+                                                               
+                                                               @Override
+                                                               public boolean 
filter(Order o) throws ParseException {
+                                                                       return 
format.parse(o.getOrderdate()).before(date);
+                                                               }
+                                                       });
+               
+               // Filter all Lineitems with l_shipdate > 12.03.1995
+               lineitems = lineitems.filter(
+                                                               new 
FilterFunction<Lineitem>() {
+                                                                       private 
final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+                                                                       private 
final Date date = format.parse("1995-03-12");
+                                                                       
+                                                                       
@Override
+                                                                       public 
boolean filter(Lineitem l) throws ParseException {
+                                                                               
return format.parse(l.getShipdate()).after(date);
+                                                                       }
+                                                               });
+
+               // Join customers with orders and package them into a 
ShippingPriorityItem
+               DataSet<ShippingPriorityItem> customerWithOrders = 
+                               customers.join(orders).where(0).equalTo(1)
+                                                       .with(
+                                                               new 
JoinFunction<Customer, Order, ShippingPriorityItem>() {
+                                                                       
@Override
+                                                                       public 
ShippingPriorityItem join(Customer c, Order o) {
+                                                                               
return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
+                                                                               
                o.getShippriority());
+                                                                       }
+                                                               });
+               
+               // Join the last join result with Lineitems
+               DataSet<ShippingPriorityItem> result = 
+                               
customerWithOrders.join(lineitems).where(0).equalTo(0)
+                                                                       .with(
+                                                                               
        new JoinFunction<ShippingPriorityItem, Lineitem, 
ShippingPriorityItem>() {
+                                                                               
                @Override
+                                                                               
                public ShippingPriorityItem join(ShippingPriorityItem i, 
Lineitem l) {
+                                                                               
                        i.setRevenue(l.getExtendedprice() * (1 - 
l.getDiscount()));
+                                                                               
                        return i;
+                                                                               
                }
+                                                                               
        })
+                                                               // Group by 
l_orderkey, o_orderdate and o_shippriority and compute revenue sum
+                                                               .groupBy(0, 2, 
3)
+                                                               
.aggregate(Aggregations.SUM, 1);
+               
+               // emit result
+               result.writeAsCsv(outputPath, "\n", "|");
+               
+               // execute program
+               env.execute("TPCH Query 3 Example");
+               
+       }
+       
+       // 
*************************************************************************
+       //     DATA TYPES
+       // 
*************************************************************************
+       
+       public static class Lineitem extends Tuple4<Long, Double, Double, 
String> {
+
+               public Long getOrderkey() { return this.f0; }
+               public Double getDiscount() { return this.f2; }
+               public Double getExtendedprice() { return this.f1; }
+               public String getShipdate() { return this.f3; }
+       }
+
+       public static class Customer extends Tuple2<Long, String> {
+               
+               public Long getCustKey() { return this.f0; }
+               public String getMktsegment() { return this.f1; }
+       }
+
+       public static class Order extends Tuple4<Long, Long, String, Long> {
+               
+               public Long getOrderKey() { return this.f0; }
+               public Long getCustKey() { return this.f1; }
+               public String getOrderdate() { return this.f2; }
+               public Long getShippriority() { return this.f3; }
+       }
+
+       public static class ShippingPriorityItem extends Tuple4<Long, Double, 
String, Long> {
+
+               public ShippingPriorityItem() { }
+
+               public ShippingPriorityItem(Long o_orderkey, Double revenue,
+                               String o_orderdate, Long o_shippriority) {
+                       this.f0 = o_orderkey;
+                       this.f1 = revenue;
+                       this.f2 = o_orderdate;
+                       this.f3 = o_shippriority;
+               }
+               
+               public Long getOrderkey() { return this.f0; }
+               public void setOrderkey(Long orderkey) { this.f0 = orderkey; }
+               public Double getRevenue() { return this.f1; }
+               public void setRevenue(Double revenue) { this.f1 = revenue; }
+               
+               public String getOrderdate() { return this.f2; }
+               public Long getShippriority() { return this.f3; }
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static String lineitemPath;
+       private static String customerPath;
+       private static String ordersPath;
+       private static String outputPath;
+       
+       private static boolean parseParameters(String[] programArguments) {
+               
+               if(programArguments.length > 0) {
+                       if(programArguments.length == 4) {
+                               lineitemPath = programArguments[0];
+                               customerPath = programArguments[1];
+                               ordersPath = programArguments[2];
+                               outputPath = programArguments[3];
+                       } else {
+                               System.err.println("Usage: TPCHQuery3 
<lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.err.println("This program expects data from the 
TPC-H benchmark as input data.\n" +
+                                                               "  Due to legal 
restrictions, we can not ship generated data.\n" +
+                                                               "  You can find 
the TPC-H data generator at http://www.tpc.org/tpch/.\n"; + 
+                                                               "  Usage: 
TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result 
path>");
+                       return false;
+               }
+               return true;
+       }
+       
+       private static DataSet<Lineitem> 
getLineitemDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(lineitemPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("1000011000100000")
+                                       .tupleType(Lineitem.class);
+       }
+       
+       private static DataSet<Customer> 
getCustomerDataSet(ExecutionEnvironment env) {
+               return env.readCsvFile(customerPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("10000010")
+                                       .tupleType(Customer.class);
+       }
+       
+       private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment 
env) {
+               return env.readCsvFile(ordersPath)
+                                       .fieldDelimiter("|")
+                                       .includeFields("110010010")
+                                       .tupleType(Order.class);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
new file mode 100644
index 0000000..d31ee55
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.examples.java.relational.util.WebLogData;
+import org.apache.flink.util.Collector;
+
+/**
+ * This program processes web logs and relational data. 
+ * It implements the following relational query:
+ *
+ * <pre>{@code
+ * SELECT 
+ *       r.pageURL, 
+ *       r.pageRank, 
+ *       r.avgDuration
+ * FROM documents d JOIN rankings r
+ *                  ON d.url = r.url
+ * WHERE CONTAINS(d.text, [keywords]) 
+ *       AND r.rank > [rank] 
+ *       AND NOT EXISTS 
+ *           (
+ *              SELECT * FROM Visits v
+ *              WHERE v.destUrl = d.url 
+ *                    AND v.visitDate < [date]
+ *           );
+ * }</pre>
+ *
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as 
field separator.
+ * The tables referenced in the query can be generated using the {@link 
org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and 
+ * have the following schemas
+ * <pre>{@code
+ * CREATE TABLE Documents (
+ *                url VARCHAR(100) PRIMARY KEY,
+ *                contents TEXT );
+ *
+ * CREATE TABLE Rankings (
+ *                pageRank INT,
+ *                pageURL VARCHAR(100) PRIMARY KEY,
+ *                avgDuration INT );
+ *
+ * CREATE TABLE Visits (
+ *                sourceIP VARCHAR(16),
+ *                destURL VARCHAR(100),
+ *                visitDate DATE,
+ *                adRevenue FLOAT,
+ *                userAgent VARCHAR(64),
+ *                countryCode VARCHAR(3),
+ *                languageCode VARCHAR(6),
+ *                searchWord VARCHAR(32),
+ *                duration INT );
+ * }</pre>
+ * 
+ * <p>
+ * Usage: <code>WebLogAnalysis &lt;documents path&gt; &lt;ranks path&gt; 
&lt;visits path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WebLogData}.
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> tuple data types
+ * <li> projection and join projection
+ * <li> the CoGroup transformation for an anti-join
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class WebLogAnalysis {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+               
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<Tuple2<String, String>> documents = 
getDocumentsDataSet(env);
+               DataSet<Tuple3<Integer, String, Integer>> ranks = 
getRanksDataSet(env);
+               DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env);
+               
+               // Retain documents with keywords
+               DataSet<Tuple1<String>> filterDocs = documents
+                               .filter(new FilterDocByKeyWords())
+                               .project(0);
+
+               // Filter ranks by minimum rank
+               DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
+                               .filter(new FilterByRank());
+
+               // Filter visits by visit date
+               DataSet<Tuple1<String>> filterVisits = visits
+                               .filter(new FilterVisitsByDate())
+                               .project(0);
+
+               // Join the filtered documents and ranks, i.e., get all URLs 
with min rank and keywords
+               DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks = 
+                               filterDocs.join(filterRanks)
+                                                       .where(0).equalTo(1)
+                                                       .projectSecond(0,1,2);
+
+               // Anti-join urls with visits, i.e., retain all URLs which have 
NOT been visited in a certain time
+               DataSet<Tuple3<Integer, String, Integer>> result = 
+                               joinDocsRanks.coGroup(filterVisits)
+                                                               
.where(1).equalTo(0)
+                                                               .with(new 
AntiJoinVisits());
+
+               // emit result
+               if(fileOutput) {
+                       result.writeAsCsv(outputPath, "\n", "|");
+                       // execute program
+                       env.execute("WebLogAnalysis Example");
+               } else {
+                       result.print();
+               }
+       }
+       
+       // 
*************************************************************************
+       //     USER FUNCTIONS
+       // 
*************************************************************************
+       
+       /**
+        * MapFunction that filters for documents that contain a certain set of
+        * keywords.
+        */
+       public static class FilterDocByKeyWords implements 
FilterFunction<Tuple2<String, String>> {
+
+               private static final String[] KEYWORDS = { " editors ", " 
oscillations " };
+
+               /**
+                * Filters for documents that contain all of the given keywords 
and projects the records on the URL field.
+                *
+                * Output Format:
+                * 0: URL
+                * 1: DOCUMENT_TEXT
+                */
+               @Override
+               public boolean filter(Tuple2<String, String> value) throws 
Exception {
+                       // FILTER
+                       // Only collect the document if all keywords are 
contained
+                       String docText = value.f1;
+                       for (String kw : KEYWORDS) {
+                               if (!docText.contains(kw)) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+       }
+
+       /**
+        * MapFunction that filters for records where the rank exceeds a 
certain threshold.
+        */
+       public static class FilterByRank implements 
FilterFunction<Tuple3<Integer, String, Integer>> {
+
+               private static final int RANKFILTER = 40;
+
+               /**
+                * Filters for records of the rank relation where the rank is 
greater
+                * than the given threshold.
+                *
+                * Output Format:
+                * 0: RANK
+                * 1: URL
+                * 2: AVG_DURATION
+                */
+               @Override
+               public boolean filter(Tuple3<Integer, String, Integer> value) 
throws Exception {
+                       return (value.f0 > RANKFILTER);
+               }
+       }
+
+       /**
+        * MapFunction that filters for records of the visits relation where 
the year
+        * (from the date string) is equal to a certain value.
+        */
+       public static class FilterVisitsByDate implements 
FilterFunction<Tuple2<String, String>> {
+
+               private static final int YEARFILTER = 2007;
+
+               /**
+                * Filters for records of the visits relation where the year of 
visit is equal to a
+                * specified value. The URL of all visit records passing the 
filter is emitted.
+                *
+                * Output Format:
+                * 0: URL
+                * 1: DATE
+                */
+               @Override
+               public boolean filter(Tuple2<String, String> value) throws 
Exception {
+                       // Parse date string with the format YYYY-MM-DD and 
extract the year
+                       String dateString = value.f1;
+                       int year = Integer.parseInt(dateString.substring(0,4));
+                       return (year == YEARFILTER);
+               }
+       }
+
+
+       /**
+        * CoGroupFunction that realizes an anti-join.
+        * If the first input does not provide any pairs, all pairs of the 
second input are emitted.
+        * Otherwise, no pair is emitted.
+        */
+       @ForwardedFieldsFirst("*")
+       public static class AntiJoinVisits implements 
CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, 
Tuple3<Integer, String, Integer>> {
+
+               /**
+                * If the visit iterator is empty, all pairs of the rank 
iterator are emitted.
+                * Otherwise, no pair is emitted.
+                *
+                * Output Format:
+                * 0: RANK
+                * 1: URL
+                * 2: AVG_DURATION
+                */
+               @Override
+               public void coGroup(Iterable<Tuple3<Integer, String, Integer>> 
ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, 
Integer>> out) {
+                       // Check if there is a entry in the visits relation
+                       if (!visits.iterator().hasNext()) {
+                               for (Tuple3<Integer, String, Integer> next : 
ranks) {
+                                       // Emit all rank pairs
+                                       out.collect(next);
+                               }
+                       }
+               }
+       }
+
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static boolean fileOutput = false;
+       private static String documentsPath;
+       private static String ranksPath;
+       private static String visitsPath;
+       private static String outputPath;
+       
+       private static boolean parseParameters(String[] args) {
+               
+               if(args.length > 0) {
+                       fileOutput = true;
+                       if(args.length == 4) {
+                               documentsPath = args[0];
+                               ranksPath = args[1];
+                               visitsPath = args[2];
+                               outputPath = args[3];
+                       } else {
+                               System.err.println("Usage: WebLogAnalysis 
<documents path> <ranks path> <visits path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing WebLog Analysis example 
with built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  We provide a data generator to 
create synthetic input files for this program.");
+                       System.out.println("  Usage: WebLogAnalysis <documents 
path> <ranks path> <visits path> <result path>");
+               }
+               return true;
+       }
+       
+       private static DataSet<Tuple2<String, String>> 
getDocumentsDataSet(ExecutionEnvironment env) {
+               // Create DataSet for documents relation (URL, Doc-Text)
+               if(fileOutput) {
+                       return env.readCsvFile(documentsPath)
+                                               .fieldDelimiter("|")
+                                               .types(String.class, 
String.class);
+               } else {
+                       return WebLogData.getDocumentDataSet(env);
+               }
+       }
+       
+       private static DataSet<Tuple3<Integer, String, Integer>> 
getRanksDataSet(ExecutionEnvironment env) {
+               // Create DataSet for ranks relation (Rank, URL, 
Avg-Visit-Duration)
+               if(fileOutput) {
+                       return env.readCsvFile(ranksPath)
+                                               .fieldDelimiter("|")
+                                               .types(Integer.class, 
String.class, Integer.class);
+               } else {
+                       return WebLogData.getRankDataSet(env);
+               }
+       }
+
+       private static DataSet<Tuple2<String, String>> 
getVisitsDataSet(ExecutionEnvironment env) {
+               // Create DataSet for visits relation (URL, Date)
+               if(fileOutput) {
+                       return env.readCsvFile(visitsPath)
+                                               .fieldDelimiter("|")
+                                               .includeFields("011000000")
+                                               .types(String.class, 
String.class);
+               } else {
+                       return WebLogData.getVisitDataSet(env);
+               }
+       }
+               
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogData.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogData.java
new file mode 100644
index 0000000..ce11953
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogData.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Provides the default data sets used for the Weblog Analysis example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class WebLogData {
+
+       public static final Object [][] DOCUMENTS = {
+                       new Object [] {"url_0","dolor ad amet enim laoreet 
nostrud veniam aliquip ex nonummy diam dolore tincidunt tation exerci exerci 
wisi dolor nostrud "},
+                       new Object [] {"url_1","wisi minim adipiscing nibh 
adipiscing ut nibh Lorem Ut nonummy euismod nibh wisi sit consectetuer exerci 
sed aliquip aliquip dolore aliquam enim dolore veniam aliquam euismod suscipit 
ad adipiscing exerci aliquip consectetuer euismod aliquip ad exerci ex nibh ex 
erat exerci laoreet lobortis quis "},
+                       new Object [] {"url_2","diam sed convection aliquip 
amet commodo nonummy sed sed commodo commodo diam commodo adipiscing ad exerci 
magna exerci tation quis lobortis "},
+                       new Object [] {"url_3","exerci suscipit sed lobortis 
amet lobortis aliquip nibh nostrud ad convection commodo ad nibh sed minim amet 
ad ea ea "},
+                       new Object [] {"url_4","sit enim dolor quis laoreet 
ullamcorper veniam adipiscing ex quis commodo "},
+                       new Object [] {"url_5","elit aliquip ea nisl 
oscillations sit dolor ipsum tincidunt ullamcorper dolore enim adipiscing 
laoreet elit ea volutpat adipiscing ea nibh nostrud Ut aliquam veniam Lorem 
laoreet veniam aliquip "},
+                       new Object [] {"url_6","consectetuer ad sed suscipit 
euismod aliquip quis ullamcorper oscillations tation consectetuer tation amet 
suscipit nibh enim nonummy veniam commodo commodo diam euismod dolor Ut aliquip 
diam ex ad nonummy ad tincidunt minim exerci consectetuer veniam convection 
aliquam ut ut Lorem euismod sed ipsum volutpat "},
+                       new Object [] {"url_7","Ut volutpat veniam ut 
consectetuer diam ut aliquam dolor nostrud erat consectetuer adipiscing exerci 
consectetuer Ut ullamcorper suscipit aliquam sed dolor nisl "},
+                       new Object [] {"url_8","suscipit amet wisi nisl veniam 
lobortis sit Lorem aliquam nostrud aliquam ipsum ut laoreet suscipit Lorem 
laoreet editors adipiscing ullamcorper veniam erat consectetuer ut lobortis 
dolore elit sed tincidunt ipsum tation ullamcorper nonummy adipiscing ex ad 
laoreet ipsum suscipit lobortis lobortis Ut nonummy adipiscing erat volutpat 
aliquam "},
+                       new Object [] {"url_9","nonummy commodo tation editors 
ut quis sit quis lobortis ea dolore oscillations diam ad dolor lobortis nisl ad 
veniam ullamcorper quis magna volutpat sit ipsum consectetuer dolore exerci 
commodo magna erat enim ut suscipit "},
+                       new Object [] {"url_10","amet erat magna consectetuer 
tation tation aliquip nibh aliquam sed adipiscing ut commodo ex erat tincidunt 
aliquam ipsum Ut Ut sit tincidunt adipiscing suscipit minim sed erat dolor 
consectetuer Lorem consectetuer Lorem amet nibh diam ea ex enim suscipit wisi 
dolor nonummy magna enim euismod ullamcorper ut suscipit adipiscing "},
+                       new Object [] {"url_11","ex quis exerci tation diam 
elit nostrud nostrud ut ipsum elit amet diam laoreet amet consectetuer volutpat 
sed lobortis "},
+                       new Object [] {"url_12","elit suscipit sit ullamcorper 
ut ad erat ut dolor nostrud quis nisl enim erat dolor convection ad minim ut 
veniam nostrud sed editors adipiscing volutpat Ut aliquip commodo sed euismod 
adipiscing erat adipiscing dolore nostrud minim sed lobortis ea diam "},
+                       new Object [] {"url_13","enim ut quis commodo veniam 
minim erat lobortis ad diam ex dolor tincidunt exerci ut aliquip tincidunt 
minim ut magna sed enim wisi veniam oscillations Lorem consectetuer "},
+                       new Object [] {"url_14","nibh ipsum ullamcorper 
volutpat ut wisi dolor quis amet euismod quis ipsum ipsum minim tation volutpat 
sit exerci volutpat amet nonummy euismod veniam consectetuer sit consectetuer 
tincidunt nibh aliquam lobortis tation veniam ut ullamcorper wisi magna Ut 
volutpat consectetuer erat quis dolore ea tation "},
+                       new Object [] {"url_15","ad wisi sed enim aliquam 
oscillations nibh Lorem lobortis veniam nibh laoreet nonummy sed nibh Lorem 
adipiscing diam magna nostrud magna oscillations ut oscillations elit nostrud 
diam editors Lorem "},
+                       new Object [] {"url_16","nostrud volutpat veniam exerci 
tincidunt nostrud quis elit ipsum ea nonummy volutpat dolor elit lobortis magna 
nisl ut ullamcorper magna Lorem exerci nibh nisl magna editors erat aliquam 
aliquam ullamcorper sit aliquam sit nostrud oscillations consectetuer 
adipiscing suscipit convection exerci ea ullamcorper ex nisl "},
+                       new Object [] {"url_17","ad ex aliquam erat aliquam 
elit veniam laoreet ut amet amet nostrud ut adipiscing Ut Lorem suscipit ex 
magna ullamcorper aliquam ullamcorper ullamcorper amet amet commodo aliquam 
volutpat nonummy nonummy tincidunt amet tation tincidunt volutpat ut veniam 
nisl erat dolor enim nonummy nostrud adipiscing laoreet adipiscing "},
+                       new Object [] {"url_18","lobortis ipsum ex tincidunt 
tincidunt editors euismod consectetuer ipsum adipiscing lobortis exerci 
adipiscing nonummy nisl dolore nonummy erat exerci nisl ut dolore wisi volutpat 
lobortis magna "},
+                       new Object [] {"url_19","ipsum tation laoreet tation 
adipiscing wisi nibh diam Ut suscipit ad wisi "},
+                       new Object [] {"url_20","diam Lorem enim wisi ad 
lobortis dolor Ut ipsum amet dolore consectetuer nisl exerci nisl nonummy minim 
Ut erat oscillations ut Lorem nostrud dolore Ut dolore exerci ad ipsum dolore 
ex dolore aliquip sed aliquam ex aliquip magna amet ex dolore oscillations 
aliquip tation magna Ut "},
+                       new Object [] {"url_21","lobortis ut amet ex nisl 
ullamcorper tincidunt ut elit diam quis suscipit ad amet ipsum magna Ut ex 
tincidunt "},
+                       new Object [] {"url_22","amet commodo nisl ad quis 
lobortis ut commodo sit ut erat exerci lobortis suscipit nibh ut nostrud ut 
adipiscing commodo commodo quis quis nostrud nisl ipsum nostrud laoreet Lorem 
nostrud erat nostrud amet consectetuer laoreet oscillations wisi sit magna nibh 
amet "},
+                       new Object [] {"url_23","adipiscing suscipit suscipit 
aliquip suscipit consectetuer minim magna ea erat nibh sit suscipit sed dolor 
oscillations nonummy volutpat ut tincidunt "},
+                       new Object [] {"url_24","commodo sed tincidunt aliquip 
aliquip dolore commodo nonummy sed erat ut ex exerci dolore adipiscing 
tincidunt ex diam amet aliquam "},
+                       new Object [] {"url_25","consectetuer consectetuer 
exerci quis ea veniam aliquam laoreet minim ex "},
+                       new Object [] {"url_26","dolor exerci euismod minim 
magna quis erat consectetuer sed ex erat dolore quis ut oscillations 
ullamcorper Lorem exerci ex nibh ut exerci ullamcorper veniam nibh ut commodo 
ut Ut nostrud tincidunt tincidunt ad dolore Lorem ea tation enim erat nibh ut 
ea nonummy sed sed wisi nisl dolore "},
+                       new Object [] {"url_27","amet elit ea ea nostrud 
editors Ut nostrud amet laoreet adipiscing ut nisl nonummy tincidunt ea ipsum 
ex dolore dolore oscillations sit minim Ut wisi ut laoreet minim elit "},
+                       new Object [] {"url_28","wisi exerci volutpat Ut 
nostrud euismod minim Ut sit euismod ut ea magna consectetuer nisl ad minim 
tation nisl adipiscing Lorem aliquam quis exerci erat minim aliquip sit Lorem 
wisi wisi ut "},
+                       new Object [] {"url_29","amet sed laoreet amet aliquam 
minim enim tincidunt Lorem sit aliquip amet suscipit ut laoreet elit suscipit 
erat ut tincidunt suscipit ipsum sed euismod elit dolore euismod dolore ut 
dolor nostrud ipsum tincidunt commodo adipiscing aliquam ut wisi dolor dolor 
suscipit "},
+                       new Object [] {"url_30","euismod Lorem ex tincidunt 
amet enim minim suscipit exerci diam veniam amet nostrud ea ea "},
+                       new Object [] {"url_31","ex ipsum sit euismod euismod 
ullamcorper tincidunt ut wisi ea adipiscing sed diam tation ipsum dolor aliquam 
veniam nonummy aliquip aliquip Lorem ut minim nisl tation sit exerci 
ullamcorper Ut dolor euismod aliquam consectetuer ad nonummy commodo exerci "},
+                       new Object [] {"url_32","volutpat ipsum lobortis nisl 
veniam minim adipiscing dolor editors quis nostrud amet nostrud "},
+                       new Object [] {"url_33","commodo wisi aliquip ut 
aliquam sed nostrud ex diam ad nostrud enim ut amet enim ea ad sed tation 
nostrud suscipit ea magna magna Lorem amet lobortis ut quis nibh aliquam 
aliquam exerci aliquip lobortis consectetuer enim wisi ea nisl laoreet erat 
dolore "},
+                       new Object [] {"url_34","tincidunt adipiscing enim 
tation nibh Ut dolore tincidunt tation laoreet suscipit minim aliquam volutpat 
laoreet suscipit tincidunt nibh ut ut sit nostrud nonummy tincidunt exerci sit 
ad sed consectetuer minim dolor dolore laoreet nostrud nibh laoreet ea 
adipiscing exerci dolore ipsum "},
+                       new Object [] {"url_35","tation ut erat ut tation dolor 
Lorem laoreet Lorem elit adipiscing wisi aliquip nostrud elit Ut volutpat ea 
aliquam aliquip "},
+                       new Object [] {"url_36","lobortis enim ullamcorper 
adipiscing consectetuer aliquip wisi enim minim Ut minim elit elit aliquam 
exerci ullamcorper amet lobortis adipiscing diam laoreet consectetuer nostrud 
diam diam amet ut enim ullamcorper aliquip diam ut nostrud diam magna amet 
nonummy commodo wisi enim ullamcorper suscipit euismod dolore tincidunt magna 
suscipit elit "},
+                       new Object [] {"url_37","elit adipiscing nisl nisl ex 
aliquip nibh sed ut ad Lorem elit consectetuer ad volutpat lobortis amet veniam 
ipsum nibh ut consectetuer editors ad aliquam "},
+                       new Object [] {"url_38","elit quis nibh adipiscing sit 
consectetuer ut euismod quis tincidunt quis nisl consectetuer dolor diam 
suscipit quis dolore Lorem suscipit nonummy sed ex "},
+                       new Object [] {"url_39","nisl sit consectetuer elit 
oscillations enim ipsum enim nostrud adipiscing nostrud editors aliquam "},
+                       new Object [] {"url_40","sed wisi dolor diam commodo 
ullamcorper commodo nostrud ullamcorper laoreet minim dolore suscipit laoreet 
tation aliquip "},
+                       new Object [] {"url_41","ad consectetuer exerci nisl 
exerci amet enim diam lobortis Lorem ex volutpat volutpat nibh aliquam ut 
ullamcorper volutpat nostrud ut adipiscing ullamcorper "},
+                       new Object [] {"url_42","minim laoreet tation magna 
veniam ut ea sit ipsum tincidunt Ut amet ex aliquip ex euismod exerci wisi elit 
editors ad amet veniam ad editors "},
+                       new Object [] {"url_43","ut nisl ad ullamcorper nibh Ut 
editors exerci enim exerci ea laoreet veniam ea amet exerci volutpat amet ad "},
+                       new Object [] {"url_44","volutpat tincidunt enim amet 
sed tincidunt consectetuer ullamcorper nisl Ut adipiscing tation ad ad amet 
nonummy elit erat nibh Lorem erat elit laoreet consectetuer sed aliquip nostrud 
"},
+                       new Object [] {"url_45","sed aliquam ut ut consectetuer 
wisi euismod enim erat euismod quis exerci amet tation sit "},
+                       new Object [] {"url_46","lobortis oscillations tation 
aliquam dolore Lorem aliquip tation exerci ullamcorper aliquam aliquip lobortis 
ex tation dolor ut ut sed suscipit nisl ullamcorper sed editors laoreet aliquip 
enim dolor veniam tincidunt sed euismod tation "},
+                       new Object [] {"url_47","Lorem Lorem ut wisi ad ut 
tation consectetuer exerci convection tation ullamcorper sed dolore quis 
aliquam ipsum lobortis commodo nonummy "},
+                       new Object [] {"url_48","laoreet minim veniam nisl elit 
sit amet commodo ex ullamcorper suscipit aliquip laoreet convection Ut ex minim 
aliquam "},
+                       new Object [] {"url_49","lobortis nonummy minim amet 
sit veniam quis consectetuer tincidunt laoreet quis "},
+                       new Object [] {"url_50","lobortis nisl commodo dolor 
amet nibh editors enim magna minim elit euismod diam laoreet laoreet ad minim 
sed ut Ut lobortis adipiscing quis sed ut aliquam oscillations exerci tation 
consectetuer lobortis elit tincidunt consectetuer minim amet dolore quis 
aliquam Ut exerci sed aliquam quis quis ullamcorper Ut ex tincidunt "},
+                       new Object [] {"url_51","nostrud nisl ea erat ut 
suscipit Ut sit oscillations ullamcorper nonummy magna lobortis dolore editors 
tincidunt nostrud suscipit ex quis tation ut sit amet nostrud laoreet ex 
tincidunt "},
+                       new Object [] {"url_52","ea tation commodo elit sed ex 
sed quis enim nisl magna laoreet adipiscing amet sit nostrud consectetuer nibh 
tincidunt veniam ex veniam euismod exerci sed dolore suscipit nisl tincidunt 
euismod quis Ut enim euismod dolor diam exerci magna exerci ut exerci nisl "},
+                       new Object [] {"url_53","volutpat amet Ut lobortis 
dolor tation minim nonummy lobortis convection nostrud "},
+                       new Object [] {"url_54","ullamcorper commodo Ut amet 
sit nostrud aliquam ad amet wisi enim nostrud ipsum nisl veniam erat aliquam ex 
aliquam dolor dolor ut consectetuer euismod exerci elit exerci Ut ea minim enim 
consectetuer ad consectetuer nonummy convection adipiscing ad ullamcorper 
lobortis nonummy laoreet nonummy aliquam ullamcorper ad nostrud amet "},
+                       new Object [] {"url_55","wisi magna editors amet 
aliquam diam amet aliquip nisl consectetuer laoreet nonummy suscipit euismod 
diam enim tation elit ut lobortis quis euismod suscipit nostrud ea ea commodo 
lobortis dolore Ut nisl nostrud dolor laoreet euismod ea dolore aliquam ut 
Lorem exerci ex sit "},
+                       new Object [] {"url_56","ex dolor veniam wisi laoreet 
ut exerci diam ad ex ut ut laoreet ut nisl ullamcorper nisl "},
+                       new Object [] {"url_57","diam adipiscing Ut ut Lorem 
amet erat elit erat magna adipiscing euismod elit ullamcorper nostrud aliquam 
dolor ullamcorper sit tation tation "},
+                       new Object [] {"url_58","laoreet convection veniam 
lobortis dolore ut nonummy commodo erat lobortis veniam nostrud dolore minim 
commodo ut consectetuer magna erat ea dolore Lorem suscipit ex ipsum exerci sed 
enim ea tation suscipit enim adipiscing "},
+                       new Object [] {"url_59","amet ut ut Ut ad dolor quis ad 
magna exerci suscipit magna nibh commodo euismod amet euismod wisi diam 
suscipit dolore Lorem dolor ex amet exerci aliquip ut ut lobortis quis elit 
minim sed Lorem "},
+                       new Object [] {"url_60","ut ut amet ullamcorper amet 
euismod dolor amet elit exerci adipiscing sed suscipit sed exerci wisi diam 
veniam wisi suscipit ut quis nibh ullamcorper ex quis magna dolore volutpat 
editors minim ut sit aliquip oscillations nisl ipsum "},
+                       new Object [] {"url_61","nibh nostrud tincidunt 
lobortis adipiscing adipiscing ullamcorper ullamcorper ipsum nisl ullamcorper 
aliquip laoreet commodo ut tation wisi diam commodo aliquip commodo suscipit 
tincidunt volutpat elit enim laoreet ut nostrud ad nonummy ipsum "},
+                       new Object [] {"url_62","Ut ut minim enim amet euismod 
erat elit commodo consectetuer Ut quis dolor ex diam quis wisi tation tincidunt 
laoreet volutpat "},
+                       new Object [] {"url_63","ut erat volutpat euismod amet 
ea nonummy lobortis ut Ut ea veniam sed veniam nostrud "},
+                       new Object [] {"url_64","tation dolor suscipit minim 
nisl wisi consectetuer aliquip tation Ut commodo ut dolore consectetuer elit 
wisi nisl ipsum "},
+                       new Object [] {"url_65","ullamcorper nisl Lorem magna 
tation veniam aliquam diam amet euismod "},
+                       new Object [] {"url_66","euismod aliquam tincidunt Ut 
volutpat ea lobortis sit ut volutpat ut lobortis ut lobortis ut nisl amet dolor 
sed ipsum enim ullamcorper diam euismod nostrud wisi erat quis diam nibh Ut 
dolore sed amet tation enim diam "},
+                       new Object [] {"url_67","amet minim minim amet laoreet 
Lorem aliquam veniam elit volutpat magna adipiscing enim enim euismod laoreet 
sed ex sed aliquam ad ea ut adipiscing suscipit ex minim dolore minim ea 
laoreet nisl "},
+                       new Object [] {"url_68","aliquam ea volutpat ut wisi 
tation tation nibh nisl erat laoreet ea volutpat dolor dolor aliquam exerci 
quis ullamcorper aliquam ut quis suscipit "},
+                       new Object [] {"url_69","quis exerci ut aliquip wisi 
dolore magna nibh consectetuer magna tation ullamcorper lobortis sed amet 
adipiscing minim suscipit nibh nibh nostrud euismod enim "},
+                       new Object [] {"url_70","tation enim consectetuer 
adipiscing wisi laoreet diam aliquip nostrud elit nostrud aliquip ea minim amet 
diam dolore "},
+                       new Object [] {"url_71","consectetuer tincidunt nibh 
amet tation nonummy sit tation diam sed diam tation "},
+                       new Object [] {"url_72","Lorem ut nostrud nonummy minim 
quis euismod lobortis nostrud nonummy adipiscing tincidunt consectetuer ut nibh 
ad suscipit dolor ut elit dolore amet ut quis tation ullamcorper nonummy 
laoreet ullamcorper aliquam dolore convection dolor tincidunt ut ullamcorper ex 
dolor suscipit erat oscillations ad "},
+                       new Object [] {"url_73","elit Ut commodo ut ullamcorper 
ullamcorper ut euismod commodo diam aliquip suscipit consectetuer exerci tation 
nostrud ut wisi exerci sed ut elit sed volutpat Lorem nibh laoreet consectetuer 
ex Lorem elit aliquam commodo lobortis ad "},
+                       new Object [] {"url_74","quis magna laoreet commodo 
aliquam nisl ullamcorper veniam tation wisi consectetuer commodo consectetuer 
ad dolore aliquam dolor elit amet sit amet nibh commodo erat veniam aliquip 
dolore ad magna ad ipsum Ut exerci ea volutpat nisl amet nostrud sit "},
+                       new Object [] {"url_75","tincidunt suscipit sit aliquip 
aliquam adipiscing dolore exerci Ut suscipit ut sit laoreet suscipit wisi sit 
enim nonummy consectetuer dolore editors "},
+                       new Object [] {"url_76","veniam ullamcorper tation sit 
suscipit dolor suscipit veniam sit Lorem quis sed nostrud ad tincidunt elit 
adipiscing "},
+                       new Object [] {"url_77","volutpat sit amet veniam quis 
ipsum nibh elit enim commodo magna veniam magna convection "},
+                       new Object [] {"url_78","tation dolore minim elit nisl 
volutpat tation laoreet enim nostrud exerci dolore tincidunt aliquip Lorem 
ipsum nostrud quis adipiscing ullamcorper erat lobortis tation commodo Ut ipsum 
commodo magna ad ipsum ut enim "},
+                       new Object [] {"url_79","lobortis amet elit Lorem amet 
nonummy commodo tation ex ea amet Lorem ea nonummy commodo veniam volutpat nibh 
wisi ad ipsum euismod ea convection nostrud nisl erat veniam Ut aliquip ad 
aliquip editors wisi magna tation nostrud nonummy adipiscing ullamcorper 
aliquip "},
+                       new Object [] {"url_80","tincidunt nostrud nostrud 
magna ea euismod ea consectetuer nisl exerci ea dolor nisl commodo ex erat 
ipsum exerci suscipit ad nisl ea nonummy suscipit adipiscing laoreet sit 
euismod nibh adipiscing sed minim commodo amet "},
+                       new Object [] {"url_81","nostrud erat ut sed editors 
erat amet magna lobortis diam laoreet dolor amet nibh ut ipsum ipsum amet ut 
sed ut exerci elit suscipit wisi magna ut veniam nisl commodo enim adipiscing 
laoreet ad Lorem oscillations "},
+                       new Object [] {"url_82","quis commodo nibh nibh 
volutpat suscipit dolore magna tincidunt nibh ut ad ullamcorper ullamcorper 
quis enim ad ut tation minim laoreet veniam dolor sed tincidunt exerci exerci 
nostrud ullamcorper amet ut ut ullamcorper "},
+                       new Object [] {"url_83","sit suscipit volutpat elit 
tation elit sed sed dolor ex ex ipsum euismod laoreet magna lobortis ad "},
+                       new Object [] {"url_84","lobortis ipsum euismod enim ea 
tation veniam tation oscillations aliquip consectetuer euismod ut sed lobortis 
tation oscillations commodo euismod laoreet suscipit amet elit ullamcorper 
volutpat aliquam ea enim ullamcorper consectetuer laoreet tation quis ut 
commodo erat euismod dolor laoreet ullamcorper laoreet "},
+                       new Object [] {"url_85","adipiscing sit quis commodo 
consectetuer quis enim euismod exerci nonummy ea nostrud Ut veniam sit aliquip 
nisl enim "},
+                       new Object [] {"url_86","nostrud dolore veniam veniam 
wisi aliquip adipiscing diam sed quis ullamcorper "},
+                       new Object [] {"url_87","quis Lorem suscipit Ut nibh 
diam euismod consectetuer lobortis ipsum sed suscipit consectetuer euismod 
laoreet ut wisi nisl elit quis commodo adipiscing adipiscing suscipit aliquam 
nisl quis magna ipsum enim ad quis ea magna Lorem nibh ea "},
+                       new Object [] {"url_88","euismod commodo sed tincidunt 
Ut veniam consectetuer quis erat ex ea erat laoreet commodo nibh minim "},
+                       new Object [] {"url_89","tation diam editors Ut enim 
nibh Lorem volutpat quis diam suscipit exerci wisi ad "},
+                       new Object [] {"url_90","volutpat editors ea nibh wisi 
ad amet volutpat nisl ullamcorper nibh volutpat minim ex ut sit veniam Lorem 
consectetuer quis ad sit suscipit volutpat wisi diam sed tincidunt ipsum minim 
convection ea diam oscillations quis lobortis "},
+                       new Object [] {"url_91","enim minim nonummy ea minim 
euismod adipiscing editors volutpat magna sit magna ut ipsum ut "},
+                       new Object [] {"url_92","nisl Ut commodo amet euismod 
lobortis ea ea wisi commodo Lorem sit ipsum volutpat nonummy exerci erat elit 
exerci magna ad erat enim laoreet quis nostrud wisi ut veniam amet ullamcorper 
lobortis ad suscipit volutpat veniam nostrud nibh quis ipsum dolore 
consectetuer veniam ipsum aliquip dolore sed laoreet ipsum "},
+                       new Object [] {"url_93","nonummy aliquam ad lobortis 
Lorem erat ad tation Lorem exerci ex "},
+                       new Object [] {"url_94","nonummy dolore commodo exerci 
ex quis ut suscipit elit laoreet sit tation magna veniam ea sit nonummy veniam 
Lorem quis nibh aliquip exerci amet ullamcorper adipiscing erat nisl editors 
diam commodo ad euismod adipiscing ea suscipit exerci aliquip volutpat tation 
enim volutpat sit "},
+                       new Object [] {"url_95","sit suscipit oscillations 
ipsum nibh dolor ea dolore ea elit ipsum minim editors magna consectetuer 
ullamcorper commodo nonummy sit nostrud aliquip sit erat ullamcorper 
ullamcorper nibh veniam erat quis dolore nonummy "},
+                       new Object [] {"url_96","nostrud quis ut volutpat magna 
ad quis adipiscing Lorem commodo exerci laoreet magna adipiscing erat quis wisi 
ea ea laoreet enim convection ad dolor nisl amet nibh aliquam adipiscing 
tincidunt minim diam Lorem commodo adipiscing volutpat "},
+                       new Object [] {"url_97","laoreet laoreet suscipit 
nostrud dolore adipiscing volutpat Ut sed nisl diam ullamcorper ex ut ut dolor 
amet nostrud euismod dolore veniam veniam enim tation veniam ea minim minim 
volutpat tincidunt "},
+                       new Object [] {"url_98","quis lobortis amet wisi 
nostrud ipsum aliquam convection tincidunt dolore ullamcorper nibh lobortis 
volutpat ea nostrud oscillations minim nonummy enim ad lobortis exerci ipsum 
ullamcorper nibh nonummy diam amet enim veniam ut nostrud "},
+                       new Object [] {"url_99","aliquam wisi suscipit commodo 
diam amet amet magna nisl enim nostrud tation nisl nostrud nibh ut "}
+       };
+
+       public static final Object [][] RANKS = {
+                       new Object [] {30,"url_0",43},
+                       new Object [] {82,"url_1",39},
+                       new Object [] {56,"url_2",31},
+                       new Object [] {96,"url_3",36},
+                       new Object [] {31,"url_4",36},
+                       new Object [] {29,"url_5",6},
+                       new Object [] {33,"url_6",48},
+                       new Object [] {66,"url_7",40},
+                       new Object [] {28,"url_8",51},
+                       new Object [] {9,"url_9",4},
+                       new Object [] {49,"url_10",24},
+                       new Object [] {26,"url_11",12},
+                       new Object [] {39,"url_12",46},
+                       new Object [] {84,"url_13",53},
+                       new Object [] {29,"url_14",50},
+                       new Object [] {21,"url_15",12},
+                       new Object [] {69,"url_16",34},
+                       new Object [] {11,"url_17",38},
+                       new Object [] {96,"url_18",13},
+                       new Object [] {56,"url_19",48},
+                       new Object [] {18,"url_20",36},
+                       new Object [] {31,"url_21",21},
+                       new Object [] {29,"url_22",11},
+                       new Object [] {71,"url_23",30},
+                       new Object [] {85,"url_24",48},
+                       new Object [] {19,"url_25",45},
+                       new Object [] {69,"url_26",9},
+                       new Object [] {20,"url_27",51},
+                       new Object [] {33,"url_28",46},
+                       new Object [] {75,"url_29",38},
+                       new Object [] {96,"url_30",51},
+                       new Object [] {73,"url_31",40},
+                       new Object [] {67,"url_32",16},
+                       new Object [] {24,"url_33",24},
+                       new Object [] {27,"url_34",35},
+                       new Object [] {33,"url_35",35},
+                       new Object [] {7,"url_36",22},
+                       new Object [] {83,"url_37",41},
+                       new Object [] {23,"url_38",49},
+                       new Object [] {41,"url_39",33},
+                       new Object [] {66,"url_40",38},
+                       new Object [] {4,"url_41",52},
+                       new Object [] {34,"url_42",4},
+                       new Object [] {28,"url_43",12},
+                       new Object [] {14,"url_44",14},
+                       new Object [] {41,"url_45",11},
+                       new Object [] {48,"url_46",37},
+                       new Object [] {75,"url_47",41},
+                       new Object [] {78,"url_48",3},
+                       new Object [] {63,"url_49",28}
+       };
+
+
+       public static final Object [][] VISITS = {
+                       new Object [] {"url_2","2003-12-17"},
+                       new Object [] {"url_9","2008-11-11"},
+                       new Object [] {"url_14","2003-11-5"},
+                       new Object [] {"url_46","2009-2-16"},
+                       new Object [] {"url_14","2004-11-9"},
+                       new Object [] {"url_36","2001-3-9"},
+                       new Object [] {"url_35","2006-8-13"},
+                       new Object [] {"url_22","2008-1-18"},
+                       new Object [] {"url_36","2002-3-9"},
+                       new Object [] {"url_13","2007-7-17"},
+                       new Object [] {"url_23","2009-6-16"},
+                       new Object [] {"url_16","2000-7-15"},
+                       new Object [] {"url_41","2002-5-10"},
+                       new Object [] {"url_6","2004-11-9"},
+                       new Object [] {"url_5","2003-6-7"},
+                       new Object [] {"url_22","2002-11-5"},
+                       new Object [] {"url_11","2007-7-21"},
+                       new Object [] {"url_38","2009-12-2"},
+                       new Object [] {"url_6","2004-11-2"},
+                       new Object [] {"url_46","2000-6-4"},
+                       new Object [] {"url_34","2003-9-2"},
+                       new Object [] {"url_31","2008-2-24"},
+                       new Object [] {"url_0","2003-2-2"},
+                       new Object [] {"url_47","2003-7-8"},
+                       new Object [] {"url_49","2009-9-13"},
+                       new Object [] {"url_11","2003-4-2"},
+                       new Object [] {"url_20","2000-6-18"},
+                       new Object [] {"url_38","2000-2-22"},
+                       new Object [] {"url_44","2009-2-17"},
+                       new Object [] {"url_26","2000-6-21"},
+                       new Object [] {"url_13","2000-11-25"},
+                       new Object [] {"url_47","2005-4-19"},
+                       new Object [] {"url_46","2008-1-7"},
+                       new Object [] {"url_33","2004-12-24"},
+                       new Object [] {"url_32","2009-2-8"},
+                       new Object [] {"url_26","2000-9-21"},
+                       new Object [] {"url_9","2002-8-18"},
+                       new Object [] {"url_38","2002-11-27"},
+                       new Object [] {"url_37","2008-2-26"},
+                       new Object [] {"url_1","2007-3-22"},
+                       new Object [] {"url_37","2002-3-20"},
+                       new Object [] {"url_27","2008-11-12"},
+                       new Object [] {"url_30","2000-12-16"},
+                       new Object [] {"url_48","2000-12-17"},
+                       new Object [] {"url_46","2008-4-16"},
+                       new Object [] {"url_29","2006-3-9"},
+                       new Object [] {"url_0","2007-7-26"},
+                       new Object [] {"url_46","2009-12-15"},
+                       new Object [] {"url_34","2002-2-13"},
+                       new Object [] {"url_24","2009-3-1"},
+                       new Object [] {"url_43","2007-11-4"},
+                       new Object [] {"url_3","2004-2-16"},
+                       new Object [] {"url_26","2000-10-26"},
+                       new Object [] {"url_42","2004-7-14"},
+                       new Object [] {"url_13","2004-9-10"},
+                       new Object [] {"url_21","2000-2-21"},
+                       new Object [] {"url_9","2006-6-5"},
+                       new Object [] {"url_46","2001-12-17"},
+                       new Object [] {"url_24","2006-12-8"},
+                       new Object [] {"url_25","2006-9-2"},
+                       new Object [] {"url_37","2002-6-26"},
+                       new Object [] {"url_18","2006-6-2"},
+                       new Object [] {"url_46","2003-5-24"},
+                       new Object [] {"url_32","2000-10-17"},
+                       new Object [] {"url_45","2002-1-12"},
+                       new Object [] {"url_12","2005-12-13"},
+                       new Object [] {"url_49","2009-3-9"},
+                       new Object [] {"url_31","2001-9-19"},
+                       new Object [] {"url_22","2002-7-9"},
+                       new Object [] {"url_27","2005-2-3"},
+                       new Object [] {"url_43","2008-7-15"},
+                       new Object [] {"url_20","2000-3-23"},
+                       new Object [] {"url_25","2002-5-8"},
+                       new Object [] {"url_41","2004-4-27"},
+                       new Object [] {"url_17","2008-7-17"},
+                       new Object [] {"url_26","2009-12-16"},
+                       new Object [] {"url_34","2006-2-10"},
+                       new Object [] {"url_8","2009-4-14"},
+                       new Object [] {"url_16","2000-2-24"},
+                       new Object [] {"url_2","2009-2-10"},
+                       new Object [] {"url_35","2003-2-24"},
+                       new Object [] {"url_34","2008-3-16"},
+                       new Object [] {"url_27","2005-1-5"},
+                       new Object [] {"url_8","2008-12-10"},
+                       new Object [] {"url_38","2009-2-11"},
+                       new Object [] {"url_38","2006-11-3"},
+                       new Object [] {"url_47","2003-2-13"},
+                       new Object [] {"url_8","2008-11-17"},
+                       new Object [] {"url_26","2009-5-11"},
+                       new Object [] {"url_12","2007-11-26"},
+                       new Object [] {"url_10","2003-1-13"},
+                       new Object [] {"url_8","2005-9-23"},
+                       new Object [] {"url_42","2001-4-5"},
+                       new Object [] {"url_30","2009-12-10"},
+                       new Object [] {"url_2","2003-1-3"},
+                       new Object [] {"url_2","2009-2-19"},
+                       new Object [] {"url_7","2000-6-25"},
+                       new Object [] {"url_15","2004-9-26"},
+                       new Object [] {"url_25","2009-10-5"},
+                       new Object [] {"url_23","2009-8-9"},
+                       new Object [] {"url_27","2004-4-3"},
+                       new Object [] {"url_37","2008-6-9"},
+                       new Object [] {"url_9","2002-5-25"},
+                       new Object [] {"url_43","2009-5-18"},
+                       new Object [] {"url_21","2008-4-19"},
+                       new Object [] {"url_12","2001-12-25"},
+                       new Object [] {"url_16","2006-9-25"},
+                       new Object [] {"url_27","2002-1-2"},
+                       new Object [] {"url_2","2009-1-21"},
+                       new Object [] {"url_31","2009-3-20"},
+                       new Object [] {"url_42","2002-3-1"},
+                       new Object [] {"url_31","2001-11-26"},
+                       new Object [] {"url_20","2003-5-15"},
+                       new Object [] {"url_32","2004-1-22"},
+                       new Object [] {"url_28","2008-9-16"},
+                       new Object [] {"url_27","2006-7-3"},
+                       new Object [] {"url_11","2008-12-26"},
+                       new Object [] {"url_15","2004-8-16"},
+                       new Object [] {"url_34","2002-10-5"},
+                       new Object [] {"url_44","2000-2-15"},
+                       new Object [] {"url_9","2000-10-23"},
+                       new Object [] {"url_45","2005-4-24"},
+                       new Object [] {"url_0","2006-8-7"},
+                       new Object [] {"url_48","2003-8-7"},
+                       new Object [] {"url_8","2007-12-13"},
+                       new Object [] {"url_42","2003-8-2"},
+                       new Object [] {"url_25","2008-3-5"},
+                       new Object [] {"url_3","2007-3-9"},
+                       new Object [] {"url_49","2003-10-7"},
+                       new Object [] {"url_18","2007-12-6"},
+                       new Object [] {"url_3","2006-7-5"},
+                       new Object [] {"url_27","2000-9-14"},
+                       new Object [] {"url_42","2002-10-20"},
+                       new Object [] {"url_44","2007-1-13"},
+                       new Object [] {"url_6","2003-1-21"},
+                       new Object [] {"url_40","2009-10-20"},
+                       new Object [] {"url_28","2009-6-17"},
+                       new Object [] {"url_22","2000-2-17"},
+                       new Object [] {"url_3","2005-1-15"},
+                       new Object [] {"url_9","2008-12-9"},
+                       new Object [] {"url_9","2005-2-19"},
+                       new Object [] {"url_28","2000-4-22"},
+                       new Object [] {"url_44","2001-9-9"},
+                       new Object [] {"url_43","2008-6-21"},
+                       new Object [] {"url_39","2008-5-9"},
+                       new Object [] {"url_15","2006-9-15"},
+                       new Object [] {"url_23","2001-12-18"},
+                       new Object [] {"url_14","2002-5-23"},
+                       new Object [] {"url_11","2007-7-11"},
+                       new Object [] {"url_34","2000-12-8"},
+                       new Object [] {"url_47","2005-7-3"},
+                       new Object [] {"url_38","2004-3-26"},
+                       new Object [] {"url_19","2003-9-14"},
+                       new Object [] {"url_24","2007-7-16"},
+                       new Object [] {"url_40","2008-8-21"},
+                       new Object [] {"url_17","2007-12-4"},
+                       new Object [] {"url_25","2006-6-24"},
+                       new Object [] {"url_2","2000-10-8"},
+                       new Object [] {"url_12","2008-6-10"},
+                       new Object [] {"url_11","2004-11-24"},
+                       new Object [] {"url_13","2005-11-3"},
+                       new Object [] {"url_43","2005-1-2"},
+                       new Object [] {"url_14","2008-6-12"},
+                       new Object [] {"url_43","2001-8-27"},
+                       new Object [] {"url_45","2000-3-3"},
+                       new Object [] {"url_0","2006-9-27"},
+                       new Object [] {"url_22","2007-12-18"},
+                       new Object [] {"url_25","2006-4-4"},
+                       new Object [] {"url_32","2001-6-25"},
+                       new Object [] {"url_6","2007-6-9"},
+                       new Object [] {"url_8","2009-10-3"},
+                       new Object [] {"url_15","2003-2-23"},
+                       new Object [] {"url_37","2000-5-6"},
+                       new Object [] {"url_27","2004-3-21"},
+                       new Object [] {"url_17","2005-6-20"},
+                       new Object [] {"url_2","2004-2-27"},
+                       new Object [] {"url_36","2005-3-16"},
+                       new Object [] {"url_1","2009-12-3"},
+                       new Object [] {"url_9","2004-4-27"},
+                       new Object [] {"url_18","2009-5-26"},
+                       new Object [] {"url_31","2000-9-21"},
+                       new Object [] {"url_12","2008-9-25"},
+                       new Object [] {"url_2","2004-2-16"},
+                       new Object [] {"url_28","2008-11-12"},
+                       new Object [] {"url_28","2001-6-26"},
+                       new Object [] {"url_12","2006-3-15"},
+                       new Object [] {"url_0","2009-3-1"},
+                       new Object [] {"url_36","2006-10-13"},
+                       new Object [] {"url_15","2004-11-5"},
+                       new Object [] {"url_32","2008-2-11"},
+                       new Object [] {"url_19","2009-8-3"},
+                       new Object [] {"url_2","2006-8-6"},
+                       new Object [] {"url_11","2009-10-13"},
+                       new Object [] {"url_21","2002-9-14"},
+                       new Object [] {"url_18","2000-11-2"},
+                       new Object [] {"url_35","2006-5-15"},
+                       new Object [] {"url_11","2006-2-18"},
+                       new Object [] {"url_0","2001-4-25"},
+                       new Object [] {"url_14","2009-4-8"},
+                       new Object [] {"url_16","2009-4-7"}
+       };
+
+       public static DataSet<Tuple2<String, String>> 
getDocumentDataSet(ExecutionEnvironment env) {
+
+               List<Tuple2<String, String>> data = new 
ArrayList<Tuple2<String, String>>(100);
+               for (Object [] document : DOCUMENTS) {
+                       data.add(new Tuple2<String, String>((String) 
document[0], (String) document[1]));
+               }
+
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<Tuple3<Integer, String, Integer>> 
getRankDataSet(ExecutionEnvironment env) {
+
+               List<Tuple3<Integer, String, Integer>> data = new 
ArrayList<Tuple3<Integer, String, Integer>>(100);
+               for (Object [] rank : RANKS) {
+                       data.add(new Tuple3<Integer, String, Integer>((Integer) 
rank[0], (String) rank[1], (Integer) rank[2]));
+               }
+               return env.fromCollection(data);
+       }
+
+       public static DataSet<Tuple2<String, String>> 
getVisitDataSet(ExecutionEnvironment env) {
+
+               List<Tuple2<String, String>> data = new 
ArrayList<Tuple2<String, String>>(100);
+
+               for (Object [] visit : VISITS) {
+                       data.add(new Tuple2<String, String>((String) visit[0], 
(String) visit[1]));
+               }
+               return env.fromCollection(data);
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
new file mode 100644
index 0000000..8efa243
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.relational.util;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Random;
+
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+
+/**
+ * Data generator for the {@link WebLogAnalysis} example program. 
+ *
+ */
+public class WebLogDataGenerator {
+
+       /**
+        * Main method to generate data for the {@link WebLogAnalysis} example 
program.
+        * <p>
+        * The generator creates to files:
+        * <ul>
+        * <li><code>{tmp.dir}/documents</code> for the web documents
+        * <li><code>{tmp.dir}/ranks</code> for the ranks of the web documents
+        * <li><code>{tmp.dir}/visits</code> for the logged visits of web 
documents
+        * </ul> 
+        * 
+        * @param args 
+        * <ol>
+        * <li>Int: Number of web documents
+        * <li>Int: Number of visits
+        * </ol>
+        */
+       public static void main(String[] args) {
+
+               // parse parameters
+               if (args.length < 2) {
+                       System.out.println("WebLogDataGenerator 
<numberOfDocuments> <numberOfVisits>");
+                       System.exit(1);
+               }
+               
+               int noDocs = Integer.parseInt(args[0]);
+               int noVisits = Integer.parseInt(args[1]);
+               
+               String[] filterKWs = { "editors", "oscillations", "convection" 
};
+
+               String[] words = { "Lorem", "ipsum", "dolor", "sit", "amet",
+                               "consectetuer", "adipiscing", "elit", "sed", 
"diam", "nonummy",
+                               "nibh", "euismod", "tincidunt", "ut", 
"laoreet", "dolore",
+                               "magna", "aliquam", "erat", "volutpat", "Ut", 
"wisi", "enim",
+                               "ad", "minim", "veniam", "quis", "nostrud", 
"exerci", "tation",
+                               "ullamcorper", "suscipit", "lobortis", "nisl", 
"ut", "aliquip",
+                               "ex", "ea", "commodo" };
+
+               
+               final String outPath = System.getProperty("java.io.tmpdir");
+
+               System.out.println("Generating documents files...");
+               genDocs(noDocs, filterKWs, words, outPath + "/documents");
+               System.out.println("Generating ranks files...");
+               genRanks(noDocs, outPath + "/ranks");
+               System.out.println("Generating visits files...");
+               genVisits(noVisits, noDocs, outPath + "/visits");
+
+               System.out.println("Done!");
+       }
+
+       /**
+        * Generates the files for the documents relation. The entries apply the
+        * following format: <br />
+        * <code>URL | Content</code>
+        * 
+        * @param noDocs
+        *            Number of entries for the documents relation
+        * @param filterKeyWords
+        *            A list of keywords that should be contained
+        * @param words
+        *            A list of words to fill the entries
+        * @param path
+        *            Output path for the documents relation
+        */
+       private static void genDocs(int noDocs, String[] filterKeyWords, 
String[] words, String path) {
+
+               Random rand = new 
Random(Calendar.getInstance().getTimeInMillis());
+
+               try {
+                       FileWriter fw = new FileWriter(path);
+
+                       for (int i = 0; i < noDocs; i++) {
+
+                               int wordsInDoc = rand.nextInt(40) + 10;
+                               // URL
+                               StringBuilder doc = new StringBuilder("url_" + 
i + "|");
+                               for (int j = 0; j < wordsInDoc; j++) {
+                                       if (rand.nextDouble() > 0.9) {
+                                               // Approx. every 10th word is a 
keyword
+                                               doc.append(filterKeyWords[rand
+                                                               
.nextInt(filterKeyWords.length)] + " ");
+                                       } else {
+                                               // Fills up the docs file(s) 
with random words
+                                               
doc.append(words[rand.nextInt(words.length)] + " ");
+                                       }
+                               }
+                               doc.append("|\n");
+
+                               fw.write(doc.toString());
+                       }
+                       fw.close();
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+               }
+       }
+
+       /**
+        * Generates the files for the ranks relation. The ranks entries apply 
the
+        * following format: <br />
+        * <code>Rank | URL | Average Duration |\n</code>
+        * 
+        * @param noDocs
+        *            Number of entries in the documents relation
+        * @param path
+        *            Output path for the ranks relation
+        */
+       private static void genRanks(int noDocs, String path) {
+
+               Random rand = new 
Random(Calendar.getInstance().getTimeInMillis());
+
+               try {
+                       FileWriter fw = new FileWriter(path);
+
+                       for (int i = 0; i < noDocs; i++) {
+                               // Rank
+                               StringBuilder rank = new 
StringBuilder(rand.nextInt(100) + "|");
+                               // URL
+                               rank.append("url_" + i + "|");
+                               // Average duration
+                               rank.append(rand.nextInt(10) + rand.nextInt(50) 
+ "|\n");
+
+                               fw.write(rank.toString());
+                       }
+                       fw.close();
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+               }
+       }
+
+       /**
+        * Generates the files for the visits relation. The visits entries 
apply the
+        * following format:<br />
+        * <code>IP Address | URL | Date (YYYY-MM-DD) | Misc. Data (e.g. 
User-Agent) |\n</code>
+        * 
+        * @param noVisits
+        *            Number of entries for the visits relation
+        * @param noDocs
+        *            Number of entries in the documents relation
+        * @param path
+        *            Output path for the visits relation
+        */
+       private static void genVisits(int noVisits, int noDocs, String path) {
+
+               Random rand = new 
Random(Calendar.getInstance().getTimeInMillis());
+
+               try {
+                       FileWriter fw = new FileWriter(path);
+
+                       for (int i = 0; i < noVisits; i++) {
+
+                               int year = 2000 + rand.nextInt(10); // 
yearFilter 3
+                               int month = rand.nextInt(12) + 1; // month 
between 1 and 12
+                               int day = rand.nextInt(27) + 1; // day between 
1 and 28
+
+                               // IP address
+                               StringBuilder visit = new 
StringBuilder(rand.nextInt(256) + "."
+                                               + rand.nextInt(256) + "." + 
rand.nextInt(256) + "."
+                                               + rand.nextInt(256) + "|");
+                               // URL
+                               visit.append("url_" + rand.nextInt(noDocs) + 
"|");
+                               // Date (format: YYYY-MM-DD)
+                               visit.append(year + "-" + month + "-" + day + 
"|");
+                               // Miscellaneous data, e.g. User-Agent
+                               visit.append("0.12|Mozilla Firefox 
3.1|de|de|Nothing special|124|\n");
+
+                               fw.write(visit.toString());
+                       }
+                       fw.close();
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
new file mode 100644
index 0000000..3d88a6e
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions. 
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+               
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataSet<String> text = getTextDataSet(env);
+
+               DataSet<Tuple2<String, Integer>> counts = 
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               if (fileOutput) {
+                       counts.writeAsCsv(outputPath, "\n", " ");
+                       // execute program
+                       env.execute("WordCount Example");
+               } else {
+                       counts.print();
+               }
+               
+
+       }
+       
+       // 
*************************************************************************
+       //     USER FUNCTIONS
+       // 
*************************************************************************
+       
+       /**
+        * Implements the string tokenizer that splits sentences into words as 
a user-defined
+        * FlatMapFunction. The function takes a line (String) and splits it 
into 
+        * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, 
Integer>}).
+        */
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+       
+       private static boolean parseParameters(String[] args) {
+               
+               if(args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if(args.length == 2) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: WordCount <text 
path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing WordCount example with 
built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from a file.");
+                       System.out.println("  Usage: WordCount <text path> 
<result path>");
+               }
+               return true;
+       }
+       
+       private static DataSet<String> getTextDataSet(ExecutionEnvironment env) 
{
+               if(fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               } else {
+                       // get default test text data
+                       return WordCountData.getDefaultTextLineDataSet(env);
+               }
+       }
+}

Reply via email to