http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java
deleted file mode 100644
index f1de48d..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.util;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public class InfiniteIntegerInputFormatWithDelay extends GenericInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       private static final int DELAY = 20;
-       
-       private final IntValue one = new IntValue(1);
-       
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return false;
-       }
-
-
-       @Override
-       public Record nextRecord(Record record) throws IOException {
-               record.setField(0, this.one);
-               
-               try {
-                       Thread.sleep(DELAY);
-               } catch (InterruptedException iex) {
-                       // do nothing
-               }
-               
-               return record;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java
deleted file mode 100644
index e5d32c6..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.util;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public class IntTupleDataInFormat extends DelimitedInputFormat {
-       private static final long serialVersionUID = 1L;
-
-       public static final int MAX_COLUMNS = 20;
-
-       public static final int DELIMITER = '|';
-       
-       private final IntValue key = new IntValue();
-       private final int[] offsets = new int[MAX_COLUMNS];
-
-       @Override
-       public Record readRecord(Record target, byte[] line, int offset, int 
numBytes) {
-               final int limit = offset + numBytes;
-               int readPos = offset;
-
-               // allocate the offsets array
-               final int[] offsets = this.offsets;
-               offsets[0] = offset;
-
-               int col = 1; // the column we are in
-
-               while (readPos < limit) {
-                       if (line[readPos++] == DELIMITER) {
-                               offsets[col++] = readPos;
-                       }
-               }
-
-               final Tuple value = new Tuple(line, offsets, col - 1);
-               this.key.setValue((int) value.getLongValueAt(0));
-               
-               target.setField(0, this.key);
-               target.setField(1, value);
-               return target;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java
deleted file mode 100644
index e6b8548..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.util;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-
-public class StringTupleDataOutFormat extends DelimitedOutputFormat {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public int serializeRecord(Record rec, byte[] target) throws Exception {
-               String string = rec.getField(0, StringValue.class).toString();
-               byte[] stringBytes = string.getBytes();
-               Tuple tuple = rec.getField(1, Tuple.class);
-               String tupleStr = tuple.toString();
-               byte[] tupleBytes = tupleStr.getBytes();
-               int totalLength = stringBytes.length + 1 + tupleBytes.length;
-               if(target.length >= totalLength) {
-                       System.arraycopy(stringBytes, 0, target, 0, 
stringBytes.length);
-                       target[stringBytes.length] = '|';
-                       System.arraycopy(tupleBytes, 0, target, 
stringBytes.length + 1, tupleBytes.length);
-                       return totalLength;
-               } else {
-                       return -1 * totalLength;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java
deleted file mode 100644
index 064f15e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.util;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class Tuple implements Value {
-       private static final long serialVersionUID = 1L;
-
-       private byte[] bytes;
-
-       private int[] offsets;
-
-       private int numCols;
-
-       /**
-        * Instantiates an empty tuple.
-        */
-       public Tuple() {
-               numCols = 0;
-       }
-
-       /**
-        * Creates a new tuple with a given set of attributes.
-        * 
-        * @param bytes
-        *        The bytes array. Attributes are separated by a single 
character. The last attribute
-        *        is also terminated with a single character. 
-        * @param offsets
-        *        The offsets of the columns in the byte array. The last entry 
gives the offset of the terminating
-        *        character + 1 (if the byte array exactly holds all attributes 
and delimiting characters this is 
-        *        the length of the array).
-        * @param cols
-        *        The number of columns.
-        */
-       public Tuple(byte[] bytes, int[] offsets, int cols) {
-               this.bytes = bytes;
-               this.offsets = offsets;
-               this.numCols = cols;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Accessors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Returns the number of attributes / columns of the tuple.
-        * 
-        * @return The number of columns of the tuple.
-        */
-       public int getNumberOfColumns() {
-               return numCols;
-       }
-
-       /**
-        * Returns the internal byte array of the tuple.
-        * 
-        * @return The internal byte array of the tuple.
-        */
-       public byte[] getBytes() {
-               return bytes;
-       }
-
-       /**
-        * Returns the length of the column with the specified index. Column 
indices start at 0.
-        * 
-        * @param colNumber Index of the column. Indices start at 0.
-        * @return The length of the specified column.
-        */
-       public int getColumnLength(int colNumber) {
-               if(offsets == null) return -1;
-               if(colNumber < 0) return -1;
-               if(colNumber >= offsets.length) return -1;
-               return offsets[colNumber + 1] - offsets[colNumber] - 1;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Modification
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Appends all columns of the specified tuple to this tuple.
-        * 
-        * @param other The tuple whose columns are appended to this tuple.
-        */
-       public void concatenate(Tuple other) {
-               
-               if(other.getBytes() == null) return;
-               
-               if (bytes == null) {
-                       bytes = (byte[]) other.bytes.clone();
-                       offsets = (int[]) other.offsets.clone();
-                       numCols = other.numCols;
-               } else {
-                       int len = offsets[numCols];
-                       int otherLen = other.offsets[other.numCols];
-                       int totalLen = len + otherLen;
-
-                       // bytes:
-                       // our content
-                       if (bytes.length < totalLen) {
-                               byte[] tmp = new byte[totalLen];
-                               System.arraycopy(bytes, 0, tmp, 0, len);
-                               bytes = tmp;
-                       }
-
-                       // the other's content
-                       System.arraycopy(other.bytes, 0, bytes, len, otherLen);
-
-                       // offsets
-                       if (offsets.length < numCols + other.numCols + 1) {
-                               int[] tmp = new int[numCols + other.numCols + 
1];
-                               System.arraycopy(offsets, 0, tmp, 0, numCols + 
1);
-                               offsets = tmp;
-                       }
-
-                       // other offsets
-                       for (int i = 1; i < other.numCols + 1; i++) {
-                               offsets[numCols + i] = other.offsets[i] + len;
-                       }
-
-                       numCols += other.numCols;
-               }
-       }
-
-       /**
-        * Performs a projection on the tuple.
-        * The int parameter is interpreted as a bitmap on the columns.
-        * I.e. a bitmap value of 1 projects to the first column, 2 to the 
second, 3 to the first two columns, and so on.
-        * 
-        * @param bitmap the projection bitmap.
-        */
-       public void project(int bitmap) {
-               int[] lengths = new int[numCols];
-               int lenCount = 0;
-
-               if(bytes == null || offsets == null) return;
-               
-               // go through the bitmap and find the indexes of the columns to 
retain
-               int k = 0;
-               for (int i = 0; bitmap != 0 && i < numCols; i++, bitmap >>>= 1) 
{
-                       if ((bitmap & 0x1) != 0) {
-                               int len = offsets[i + 1] - offsets[i];
-                               lengths[k] = len;
-                               lenCount += len;
-                               offsets[k] = offsets[i];
-                               k++;
-                       }
-               }
-               numCols = k;
-
-               // allocate the new (smaller) array
-               byte[] tmp = new byte[lenCount];
-               lenCount = 0;
-
-               // copy the columns to the beginning and adjust the offsets to 
the new array
-               for (int i = 0; i < k; i++) {
-                       System.arraycopy(bytes, offsets[i], tmp, lenCount, 
lengths[i]);
-                       offsets[i] = lenCount;
-                       lenCount += lengths[i];
-               }
-
-               bytes = tmp;
-               offsets[numCols] = tmp.length;
-       }
-
-       /**
-        * Compares a String attribute of this tuple with a String attribute of 
another tuple.
-        * The strings are compared lexicographic.
-        * 
-        * @param other The other tuple.
-        * @param thisColumn The index of this tuple's String attribute.
-        * @param otherColumn The index of the other tuple's String attribute.
-        * @return 1 if this tuple's attribute is greater, 0 if both attributes 
have the same value, 
-        *         -1 if this tuple's attribute is smaller.
-        * @throws IndexOutOfBoundsException Thrown if one of the column 
indices is invalid (smaller than 0 or bigger 
-        *            than the attribute count).
-        */
-       public int compareStringAttribute(Tuple other, int thisColumn, int 
otherColumn) {
-               
-               if(thisColumn < 0) throw new IndexOutOfBoundsException();
-               if(otherColumn < 0) throw new IndexOutOfBoundsException();
-               if(thisColumn >= numCols) throw new IndexOutOfBoundsException();
-               if(otherColumn >= other.numCols) throw new 
IndexOutOfBoundsException();
-               
-               int len = getColumnLength(thisColumn);
-               int otherLen = other.getColumnLength(otherColumn);
-               int min = Math.min(len, otherLen);
-
-               int startPos = offsets[thisColumn];
-               int otherStartPos = other.offsets[otherColumn];
-               
-               for (int i = 0; i < min; i++) {
-                       if (bytes[startPos + i] < other.bytes[otherStartPos + 
i]) {
-                               return -1;
-                       } else if (bytes[startPos + i] > 
other.bytes[otherStartPos + i]) {
-                               return 1;
-                       }
-               }
-
-               if (len < otherLen) {
-                       return -1;
-               } else if (len > otherLen) {
-                       return 1;
-               } else {
-                       return 0;
-               }
-       }
-
-       /**
-        * Compares an Integer attribute of this tuple with an Integer 
attribute of another tuple.
-        * 
-        * @param other The other tuple.
-        * @param thisColumn The index of this tuple's int attribute.
-        * @param otherColumn The index of the other tuple's int attribute.
-        * @return 1 if this tuple's attribute is greater, 0 if both attributes 
have the same value, 
-        *         -1 if this tuple's attribute is smaller.
-        * @throws IndexOutOfBoundsException Thrown if one of the column 
indices is invalid (smaller than 0 or bigger 
-        *            than the attribute count).
-        */
-       public int compareIntAttribute(Tuple other, int thisColumn, int 
otherColumn) {
-               int len = getColumnLength(thisColumn);
-               int otherLen = other.getColumnLength(otherColumn);
-
-               if(thisColumn < 0) throw new IndexOutOfBoundsException();
-               if(otherColumn < 0) throw new IndexOutOfBoundsException();
-               if(thisColumn >= numCols) throw new IndexOutOfBoundsException();
-               if(otherColumn >= other.numCols) throw new 
IndexOutOfBoundsException();
-               
-               short thisNegative = 1;
-               short otherNegative = 1;
-               
-               if(this.bytes[offsets[thisColumn]] == '-') {
-                       thisNegative = -1;
-               }
-               
-               if(other.getBytes()[other.offsets[otherColumn]] == '-') {
-                       otherNegative = -1;
-               }
-               
-               // check one int is negative
-               if(thisNegative != otherNegative) {
-                       return thisNegative;
-               }
-
-               // check if they vary in length
-               if (len < otherLen) {
-                       return -1 * thisNegative;
-               } else if (len > otherLen) {
-                       return 1 * thisNegative;
-               }
-
-               // both have the same orientation and length, check digit-wise
-               int myStartPos = offsets[thisColumn];
-               int compStartPos = other.offsets[otherColumn];
-
-               for (int i = 0; i < len; i++) {
-                       if (bytes[myStartPos + i] < other.bytes[compStartPos + 
i]) {
-                               return -1 * thisNegative;
-                       } else if (bytes[myStartPos + i] > 
other.bytes[compStartPos + i]) {
-                               return 1 * thisNegative;
-                       }
-               }
-               return 0;
-
-       }
-
-       /**
-        * Returns the String value of the attribute with the specified index.
-        * 
-        * @param column The index of the attribute whose String value is 
returned.
-        * @return The String value of the specified attribute.
-        * @throws IndexOutOfBoundsException Thrown if the index of the column 
is invalid (smaller than 0 or bigger 
-        *            than the attribute count).
-        */
-       public String getStringValueAt(int column) throws 
IndexOutOfBoundsException {
-               // check for validity of column index
-               if(column < 0) throw new IndexOutOfBoundsException();
-               if(column >= numCols) throw new IndexOutOfBoundsException();
-               
-               int off = offsets[column];
-               int len = getColumnLength(column);
-
-               char[] chars = new char[len];
-               for (int i = 0; i < len; i++) {
-                       chars[i] = (char) (bytes[off + i] & 0xff);
-               }
-
-               return new String(chars);
-       }
-
-       /**
-        * Returns the Long value of the attribute with the specified index.
-        * The value must be represented in the decimal system.
-        *  
-        * @param column The index of the attribute whose value is returned as 
long.
-        * @return The long value of the specified attribute.
-        * @throws IndexOutOfBoundsException Thrown if the index of the column 
is invalid (smaller than 0 or bigger 
-        *            than the attribute count).
-        * @throws NumberFormatException Thrown if the attribute is not a valid 
long value 
-        *            (contains any other character than digits or '-'.)
-        */
-       public long getLongValueAt(int column) throws 
IndexOutOfBoundsException, NumberFormatException {
-               
-               if(column < 0) throw new IndexOutOfBoundsException();
-               if(column >= numCols) throw new IndexOutOfBoundsException();
-               
-               int off = offsets[column];
-               int len = getColumnLength(column);
-
-               boolean isNegative = false;
-
-               if(bytes[off] == '-') {
-                       isNegative = true;
-                       off++;
-                       len--;
-               }
-               
-               long value = 0;
-               for (int i = off; i < off + len; i++) {
-                       
-                       if(bytes[i] < '0' || bytes[i] > '9') throw new 
NumberFormatException();
-                       
-                       value *= 10;
-                       value += (bytes[i] - 48);
-               }
-               
-               if(isNegative) {
-                       value *= -1;
-               }
-
-               return value;
-       }
-
-       /**
-        * Returns an attribute which is specified by an index as byte array. 
-        * 
-        * @param column The index of the attribute which is returned as byte 
array.
-        * @return The value of the specified attribute as byte array value.
-        * @throws IndexOutOfBoundsException Thrown if the index of the column 
is invalid (smaller than 0 or bigger 
-        *            than the attribute count).
-        */
-       public byte[] getByteArrayValueAt(int column) throws 
IndexOutOfBoundsException {
-               
-               if(column < 0) throw new IndexOutOfBoundsException();
-               if(column >= numCols) throw new IndexOutOfBoundsException();
-               
-               int len = getColumnLength(column);
-               byte[] buffer = new byte[len];
-               System.arraycopy(bytes, offsets[column], buffer, 0, len);
-               return buffer;
-       }
-
-       /**
-        * Sets the size of the internal byte array of the tuple to the minimum 
capacity.
-        * If the minimum capacity is smaller than the current size of the 
tuple's byte array,
-        * nothing is done. Otherwise a new byte array is allocated and the 
content of the old one copied.
-        * 
-        * @param minCapacity The new size of the internal byte array. 
-        */
-       public void reserveSpace(int minCapacity) {
-               if (bytes.length < minCapacity) {
-                       byte[] tmp = new byte[minCapacity];
-                       System.arraycopy(bytes, 0, tmp, 0, offsets[numCols]);
-                       bytes = tmp;
-               }
-       }
-
-       /**
-        * Reduces the size of the internal byte and offset arrays to the 
currently used size.
-        */
-       public void compact() {
-               int len = offsets[numCols];
-
-               if (bytes.length > len) {
-                       byte[] tmp = new byte[len];
-                       System.arraycopy(bytes, 0, tmp, 0, len);
-                       bytes = tmp;
-               }
-
-               if (offsets.length > numCols + 1) {
-                       int[] tmp = new int[numCols + 1];
-                       System.arraycopy(offsets, 0, tmp, 0, numCols + 1);
-                       offsets = tmp;
-               }
-       }
-
-       /**
-        * Appends an attribute at the end of the tuple.
-        * 
-        * @param attValue The attribute to append.
-        */
-       public void addAttribute(byte[] attValue) {
-               int end;
-
-               if (numCols == 0) {
-                       offsets = new int[5];
-                       bytes = new byte[Math.max(256, attValue.length + 1)];
-                       end = 0;
-               } else {
-                       end = offsets[numCols];
-
-                       // increase offset array, if necessary
-                       if (numCols + 1 >= offsets.length) {
-                               int[] tmp = new int[offsets.length * 2];
-                               System.arraycopy(offsets, 0, tmp, 0, numCols + 
1);
-                               offsets = tmp;
-                       }
-
-                       // increase byte buffer, if necessary
-                       if (bytes.length < end + attValue.length + 1) {
-                               byte[] tmp = new byte[bytes.length + 
attValue.length + 1];
-                               System.arraycopy(bytes, 0, tmp, 0, end);
-                               bytes = tmp;
-                       }
-               }
-
-               // copy bytes, offsets and increase columns
-               System.arraycopy(attValue, 0, bytes, end, attValue.length);
-               end += attValue.length;
-               bytes[end++] = '|';
-               numCols++;
-               offsets[numCols] = end;
-       }
-
-       /**
-        * Appends an attribute at the end of the tuple.
-        * 
-        * @param attValue The attribute to append.
-        */
-       public void addAttribute(String attValue) {
-               int end;
-
-               if (numCols == 0) {
-                       offsets = new int[5];
-                       bytes = new byte[Math.max(256, attValue.length() + 1)];
-                       end = 0;
-               } else {
-                       end = offsets[numCols];
-
-                       // increase offset array, if necessary
-                       if (numCols + 1 >= offsets.length) {
-                               int[] tmp = new int[offsets.length * 2];
-                               System.arraycopy(offsets, 0, tmp, 0, numCols + 
1);
-                               offsets = tmp;
-                       }
-
-                       // increase byte buffer, if necessary
-                       if (bytes.length < end + attValue.length() + 1) {
-                               byte[] tmp = new byte[bytes.length + 
attValue.length() + 1];
-                               System.arraycopy(bytes, 0, tmp, 0, end);
-                               bytes = tmp;
-                       }
-               }
-
-               // copy bytes, offsets and increase columns
-               for (int i = 0; i < attValue.length(); i++, end++) {
-                       bytes[end] = (byte) (attValue.charAt(i) & 0xff);
-               }
-               bytes[end++] = '|';
-               numCols++;
-               offsets[numCols] = end;
-       }
-       
-       /**
-        * Appends an attribute by copying it from another tuple.
-        * 
-        * @param other The other tuple to copy from.
-        * @param column The index of the attribute to copy within the other 
tuple.
-        */
-       public void addAttributeFromKVRecord(Tuple other, int column) {
-               int len = other.getColumnLength(column) + 1;
-               int end;
-
-               if (numCols == 0) {
-                       offsets = new int[5];
-                       bytes = new byte[Math.max(256, len)];
-                       end = 0;
-               } else {
-                       end = offsets[numCols];
-
-                       // increase offset array, if necessary
-                       if (numCols + 1 >= offsets.length) {
-                               int[] tmp = new int[offsets.length * 2];
-                               System.arraycopy(offsets, 0, tmp, 0, numCols + 
1);
-                               offsets = tmp;
-                       }
-
-                       // increase byte buffer, if necessary
-                       if (bytes.length < end + len) {
-                               byte[] tmp = new byte[end + len];
-                               System.arraycopy(bytes, 0, tmp, 0, end);
-                               bytes = tmp;
-                       }
-               }
-
-               System.arraycopy(other.bytes, other.offsets[column], bytes, 
end, len);
-               numCols++;
-               offsets[numCols] = end + len;
-       }
-       
-       public void setContents(byte[] bytes, int offset, int len, char 
delimiter)
-       {
-               // make space
-               if (this.bytes == null || this.bytes.length < len) {
-                       this.bytes = new byte[len];
-               }
-               
-               // copy the binary data
-               System.arraycopy(bytes, offset, this.bytes, 0, len);
-               
-               int readPos = offset;
-               
-               // allocate the offsets array
-               if (this.offsets == null) {
-                       this.offsets = new int[4];
-               }
-
-               int col = 1; // the column we are in
-
-               int startPos = readPos;
-
-               while (readPos < offset + len) {
-                       if (bytes[readPos++] == delimiter) {
-                               if (offsets.length <= col) {
-                                       int newOffsets[] = new 
int[this.offsets.length * 2];
-                                       System.arraycopy(this.offsets, 0, 
newOffsets, 0, this.offsets.length);
-                                       this.offsets = newOffsets;
-                               }
-                               this.offsets[col++] = readPos - startPos;
-                       }
-               }
-               
-               this.numCols = col - 1;
-       }
-
-
-       // 
------------------------------------------------------------------------
-       // Serialization
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               // read the bytes
-               int numBytes = in.readInt();
-               if (numBytes > 0) {
-                       bytes = new byte[numBytes];
-                       in.readFully(bytes);
-
-                       // read the offsets
-                       numCols = in.readInt() + 1;
-                       offsets = new int[numCols + 1];
-                       for (int i = 1; i < numCols; i++) {
-                               offsets[i] = in.readInt();
-                       }
-                       // set last offset
-                       offsets[numCols] = numBytes;
-               } else {
-                       numCols = 0;
-               }
-       }
-
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               // write the bytes
-               int numBytes = (numCols > 0 ? offsets[numCols] : 0);
-               out.writeInt(numBytes);
-               if (numBytes > 0) {
-                       out.write(bytes, 0, numBytes);
-
-                       // write the offsets
-                       // exclude first and last
-                       out.writeInt(numCols - 1);
-                       for (int i = 1; i < numCols; i++) {
-                               out.writeInt(offsets[i]);
-                       }
-               }
-       }
-
-       @Override
-       public String toString() {
-               StringBuilder bld = new StringBuilder();
-
-               for (int i = 0; i < numCols; i++) {
-                       for (int k = 0; k < getColumnLength(i); k++) {
-                               bld.append((char) (bytes[offsets[i] + k] & 
0xff));
-                       }
-                       bld.append('|');
-               }
-
-               return bld.toString();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java
deleted file mode 100644
index 5e880cb..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.util;
-
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-/**
- * 
- */
-public class UniformIntInput extends GenericInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       public static final String NUM_KEYS_KEY = "testfomat.numkeys";
-       public static final String NUM_VALUES_KEY = "testfomat.numvalues";
-       
-       private static final int DEFAULT_NUM_KEYS = 1000;
-       private static final int DEFAULT_NUM_VALUES = 1000;
-       
-       private final IntValue key = new IntValue();
-       private final IntValue value = new IntValue();
-       
-       private int numKeys;
-       private int numValues;
-       
-       private int keyInt;
-       private int valueInt;
-
-       public UniformIntInput() {
-               this(DEFAULT_NUM_KEYS, DEFAULT_NUM_VALUES);
-       }
-       
-       public UniformIntInput(final int numKeys, final int numValues) {
-               this.numKeys = numKeys;
-               this.numValues = numValues;
-       }
-       
-       
-
-
-       @Override
-       public void configure(Configuration parameters) {
-               super.configure(parameters);
-               
-               this.numKeys = parameters.getInteger(NUM_KEYS_KEY, 
this.numKeys);
-               this.numValues = parameters.getInteger(NUM_VALUES_KEY, 
this.numValues);
-       }
-
-
-       @Override
-       public boolean reachedEnd() {
-               return this.valueInt >= this.numValues;
-       }
-
-
-       @Override
-       public Record nextRecord(Record record) {
-               if (this.keyInt == this.numKeys) {
-                       this.keyInt = 0;
-                       this.valueInt++;
-               }
-
-               this.key.setValue(this.keyInt);
-               this.value.setValue(this.valueInt);
-               
-               record.setField(0, this.key);
-               record.setField(1, this.value);
-               record.updateBinaryRepresenation();
-               
-               this.keyInt++;
-               
-               return record;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
deleted file mode 100644
index 2fcc523..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.wordcount;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-import java.util.StringTokenizer;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- *
- * <br><br>
- *
- * <b>Note</b>: This example uses the out-dated Record API.
- * It is recommended to use the new Java API.
- *
- */
-@SuppressWarnings("deprecation")
-public class WordCount implements Program, ProgramDescription {
-       
-       private static final long serialVersionUID = 1L;
-
-
-       /**
-        * Converts a Record containing one string in to multiple 
string/integer pairs.
-        * The string is tokenized by whitespaces. For each token a new record 
is emitted,
-        * where the token is the first field and an Integer(1) is the second 
field.
-        */
-       public static class TokenizeLine extends MapFunction {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void map(Record record, Collector<Record> collector) {
-                       // get the first field (as type StringValue) from the 
record
-                       String line = record.getField(0, 
StringValue.class).getValue();
-
-                       // normalize the line
-                       line = line.replaceAll("\\W+", " ").toLowerCase();
-                       
-                       // tokenize the line
-                       StringTokenizer tokenizer = new StringTokenizer(line);
-                       while (tokenizer.hasMoreTokens()) {
-                               String word = tokenizer.nextToken();
-                               
-                               // we emit a (word, 1) pair 
-                               collector.collect(new Record(new 
StringValue(word), new IntValue(1)));
-                       }
-               }
-       }
-
-       /**
-        * Sums up the counts for a certain given key. The counts are assumed 
to be at position <code>1</code>
-        * in the record. The other fields are not modified.
-        */
-       @Combinable
-       @ConstantFields(0)
-       public static class CountWords extends ReduceFunction {
-               
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       Record element = null;
-                       int sum = 0;
-                       while (records.hasNext()) {
-                               element = records.next();
-                               int cnt = element.getField(1, 
IntValue.class).getValue();
-                               sum += cnt;
-                       }
-
-                       element.setField(1, new IntValue(sum));
-                       out.collect(element);
-               }
-               
-               @Override
-               public void combine(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       // the logic is the same as in the reduce function, so 
simply call the reduce method
-                       reduce(records, out);
-               }
-       }
-
-
-       @Override
-       public Plan getPlan(String... args) {
-               // parse job parameters
-               int numSubTasks   = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String dataInput = (args.length > 1 ? args[1] : "");
-               String output    = (args.length > 2 ? args[2] : "");
-
-               FileDataSource source = new FileDataSource(new 
TextInputFormat(), dataInput, "Input Lines");
-               MapOperator mapper = MapOperator.builder(new TokenizeLine())
-                       .input(source)
-                       .name("Tokenize Lines")
-                       .build();
-               ReduceOperator reducer = 
ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-                       .input(mapper)
-                       .name("Count Words")
-                       .build();
-               
-               @SuppressWarnings("unchecked")
-               FileDataSink out = new FileDataSink(new CsvOutputFormat("\n", " 
", StringValue.class, IntValue.class), output, reducer, "Word Counts");
-               
-               Plan plan = new Plan(out, "WordCount Example");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameters: <numSubStasks> <input> <output>";
-       }
-
-       
-       public static void main(String[] args) throws Exception {
-               WordCount wc = new WordCount();
-               
-               if (args.length < 3) {
-                       System.err.println(wc.getDescription());
-                       System.exit(1);
-               }
-               
-               Plan plan = wc.getPlan(args);
-               
-               // This will execute the word-count embedded in a local 
context. replace this line by the commented
-               // succeeding line to send the job to a local installation or 
to a cluster for execution
-               JobExecutionResult result = LocalExecutor.execute(plan);
-               System.err.println("Total runtime: " + 
result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
deleted file mode 100644
index 780db58..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.wordcount;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * This is similar to the WordCount example and additionally demonstrates how 
to
- * use custom accumulators (built-in or custom).
- */
-@SuppressWarnings("deprecation")
-public class WordCountAccumulators implements Program, ProgramDescription {
-       
-       private static final long serialVersionUID = 1L;
-
-       public static class TokenizeLine extends MapFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-
-               // For efficiency it is recommended to have member variables 
for the accumulators
-               public static final String ACCUM_NUM_LINES = 
"accumulator.num-lines";
-               private LongCounter numLines = new LongCounter();
-
-               // This histogram accumulator collects the distribution of 
number of words per line
-               public static final String ACCUM_WORDS_PER_LINE = 
"accumulator.words-per-line";
-               private Histogram wordsPerLine = new Histogram();
-
-               public static final String ACCUM_DISTINCT_WORDS = 
"accumulator.distinct-words";
-               private SetAccumulator<StringValue> distinctWords = new 
SetAccumulator<StringValue>();
-
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-
-                       // Accumulators have to be registered to the system
-                       getRuntimeContext().addAccumulator(ACCUM_NUM_LINES, 
this.numLines);
-                       
getRuntimeContext().addAccumulator(ACCUM_WORDS_PER_LINE, this.wordsPerLine);
-                       
getRuntimeContext().addAccumulator(ACCUM_DISTINCT_WORDS, this.distinctWords);
-
-                       // You could also write to accumulators in open() or 
close()
-               }
-
-               @Override
-               public void map(Record record, Collector<Record> collector) {
-                       
-                       // Increment counter
-                       numLines.add(1L);
-                                               
-                       // get the first field (as type StringValue) from the 
record
-                       String line = record.getField(0, 
StringValue.class).getValue();
-
-                       // normalize the line
-                       line = line.replaceAll("\\W+", " ").toLowerCase();
-                       
-                       // tokenize the line
-                       StringTokenizer tokenizer = new StringTokenizer(line);
-                       int numWords = 0;
-                       
-                       while (tokenizer.hasMoreTokens()) {
-                               String word = tokenizer.nextToken();
-                               
-                               distinctWords.add(new StringValue(word));
-                               ++numWords;
-                               
-                               // we emit a (word, 1) pair 
-                               collector.collect(new Record(new 
StringValue(word), new IntValue(1)));
-                       }
-                       
-                       // Add a value to the histogram accumulator
-                       this.wordsPerLine.add(numWords);
-               }
-       }
-
-       @Combinable
-       @ConstantFields(0)
-       public static class CountWords extends ReduceFunction implements 
Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               private final IntValue cnt = new IntValue();
-
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) {
-                       Record element = null;
-                       int sum = 0;
-                       while (records.hasNext()) {
-                               element = records.next();
-                               IntValue i = element.getField(1, 
IntValue.class);
-                               sum += i.getValue();
-                       }
-
-                       this.cnt.setValue(sum);
-                       element.setField(1, this.cnt);
-                       out.collect(element);
-               }
-       }
-
-       @Override
-       public Plan getPlan(String... args) {
-               int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) 
: 1);
-               String dataInput = (args.length > 1 ? args[1] : "");
-               String output = (args.length > 2 ? args[2] : "");
-
-               FileDataSource source = new FileDataSource(new 
TextInputFormat(), dataInput, "Input Lines");
-
-               MapOperator mapper = MapOperator.builder(new 
TokenizeLine()).input(source).name("Tokenize Lines").build();
-               
-               ReduceOperator reducer = 
ReduceOperator.builder(CountWords.class, StringValue.class, 0).input(mapper)
-                               .name("Count Words").build();
-               
-               FileDataSink out = new FileDataSink(new CsvOutputFormat(), 
output, reducer, "Word Counts");
-               
-               CsvOutputFormat.configureRecordFormat(out).recordDelimiter('\n')
-                               .fieldDelimiter(' ').field(StringValue.class, 0)
-                               .field(IntValue.class, 1);
-
-               Plan plan = new Plan(out, "WordCount Example");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks] [input] [output]";
-       }
-
-       public static void main(String[] args) throws Exception {
-               WordCountAccumulators wc = new WordCountAccumulators();
-
-               if (args.length < 3) {
-                       System.err.println(wc.getDescription());
-                       System.exit(1);
-               }
-
-               Plan plan = wc.getPlan(args);
-
-               JobExecutionResult result = LocalExecutor.execute(plan);
-
-               // Accumulators can be accessed by their name. 
-               System.out.println("Number of lines counter: "+ 
result.getAccumulatorResult(TokenizeLine.ACCUM_NUM_LINES));
-               System.out.println("Words per line histogram: " + 
result.getAccumulatorResult(TokenizeLine.ACCUM_WORDS_PER_LINE));
-               System.out.println("Distinct words: " + 
result.getAccumulatorResult(TokenizeLine.ACCUM_DISTINCT_WORDS));
-       }
-
-       /**
-        * Custom accumulator
-        */
-       public static class SetAccumulator<T extends Value> implements 
Accumulator<T, HashSet<T>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private HashSet<T> set = new HashSet<T>();
-
-               @Override
-               public void add(T value) {
-                       this.set.add(value);
-               }
-
-               @Override
-               public HashSet<T> getLocalValue() {
-                       return this.set;
-               }
-
-               @Override
-               public void resetLocal() {
-                       this.set.clear();
-               }
-
-               @Override
-               public void merge(Accumulator<T, HashSet<T>> other) {
-                       // build union
-                       this.set.addAll(other.getLocalValue());
-               }
-
-               @Override
-               public Accumulator<T, HashSet<T>> clone() {
-                       SetAccumulator<T> result = new SetAccumulator<T>();
-                       result.set.addAll(set);
-                       return result;
-               }
-       }
-}

Reply via email to