http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java deleted file mode 100644 index f1de48d..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/InfiniteIntegerInputFormatWithDelay.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.util; - -import java.io.IOException; - -import org.apache.flink.api.java.record.io.GenericInputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -public class InfiniteIntegerInputFormatWithDelay extends GenericInputFormat { - private static final long serialVersionUID = 1L; - - private static final int DELAY = 20; - - private final IntValue one = new IntValue(1); - - - @Override - public boolean reachedEnd() throws IOException { - return false; - } - - - @Override - public Record nextRecord(Record record) throws IOException { - record.setField(0, this.one); - - try { - Thread.sleep(DELAY); - } catch (InterruptedException iex) { - // do nothing - } - - return record; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java deleted file mode 100644 index e5d32c6..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/IntTupleDataInFormat.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.util; - -import org.apache.flink.api.java.record.io.DelimitedInputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -public class IntTupleDataInFormat extends DelimitedInputFormat { - private static final long serialVersionUID = 1L; - - public static final int MAX_COLUMNS = 20; - - public static final int DELIMITER = '|'; - - private final IntValue key = new IntValue(); - private final int[] offsets = new int[MAX_COLUMNS]; - - @Override - public Record readRecord(Record target, byte[] line, int offset, int numBytes) { - final int limit = offset + numBytes; - int readPos = offset; - - // allocate the offsets array - final int[] offsets = this.offsets; - offsets[0] = offset; - - int col = 1; // the column we are in - - while (readPos < limit) { - if (line[readPos++] == DELIMITER) { - offsets[col++] = readPos; - } - } - - final Tuple value = new Tuple(line, offsets, col - 1); - this.key.setValue((int) value.getLongValueAt(0)); - - target.setField(0, this.key); - target.setField(1, value); - return target; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java deleted file mode 100644 index e6b8548..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/StringTupleDataOutFormat.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.util; - -import org.apache.flink.api.java.record.io.DelimitedOutputFormat; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; - -public class StringTupleDataOutFormat extends DelimitedOutputFormat { - private static final long serialVersionUID = 1L; - - @Override - public int serializeRecord(Record rec, byte[] target) throws Exception { - String string = rec.getField(0, StringValue.class).toString(); - byte[] stringBytes = string.getBytes(); - Tuple tuple = rec.getField(1, Tuple.class); - String tupleStr = tuple.toString(); - byte[] tupleBytes = tupleStr.getBytes(); - int totalLength = stringBytes.length + 1 + tupleBytes.length; - if(target.length >= totalLength) { - System.arraycopy(stringBytes, 0, target, 0, stringBytes.length); - target[stringBytes.length] = '|'; - System.arraycopy(tupleBytes, 0, target, stringBytes.length + 1, tupleBytes.length); - return totalLength; - } else { - return -1 * totalLength; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java deleted file mode 100644 index 064f15e..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/Tuple.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.util; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; - -public class Tuple implements Value { - private static final long serialVersionUID = 1L; - - private byte[] bytes; - - private int[] offsets; - - private int numCols; - - /** - * Instantiates an empty tuple. - */ - public Tuple() { - numCols = 0; - } - - /** - * Creates a new tuple with a given set of attributes. - * - * @param bytes - * The bytes array. Attributes are separated by a single character. The last attribute - * is also terminated with a single character. - * @param offsets - * The offsets of the columns in the byte array. The last entry gives the offset of the terminating - * character + 1 (if the byte array exactly holds all attributes and delimiting characters this is - * the length of the array). - * @param cols - * The number of columns. - */ - public Tuple(byte[] bytes, int[] offsets, int cols) { - this.bytes = bytes; - this.offsets = offsets; - this.numCols = cols; - } - - // ------------------------------------------------------------------------ - // Accessors - // ------------------------------------------------------------------------ - - /** - * Returns the number of attributes / columns of the tuple. - * - * @return The number of columns of the tuple. - */ - public int getNumberOfColumns() { - return numCols; - } - - /** - * Returns the internal byte array of the tuple. - * - * @return The internal byte array of the tuple. - */ - public byte[] getBytes() { - return bytes; - } - - /** - * Returns the length of the column with the specified index. Column indices start at 0. - * - * @param colNumber Index of the column. Indices start at 0. - * @return The length of the specified column. - */ - public int getColumnLength(int colNumber) { - if(offsets == null) return -1; - if(colNumber < 0) return -1; - if(colNumber >= offsets.length) return -1; - return offsets[colNumber + 1] - offsets[colNumber] - 1; - } - - // ------------------------------------------------------------------------ - // Modification - // ------------------------------------------------------------------------ - - /** - * Appends all columns of the specified tuple to this tuple. - * - * @param other The tuple whose columns are appended to this tuple. - */ - public void concatenate(Tuple other) { - - if(other.getBytes() == null) return; - - if (bytes == null) { - bytes = (byte[]) other.bytes.clone(); - offsets = (int[]) other.offsets.clone(); - numCols = other.numCols; - } else { - int len = offsets[numCols]; - int otherLen = other.offsets[other.numCols]; - int totalLen = len + otherLen; - - // bytes: - // our content - if (bytes.length < totalLen) { - byte[] tmp = new byte[totalLen]; - System.arraycopy(bytes, 0, tmp, 0, len); - bytes = tmp; - } - - // the other's content - System.arraycopy(other.bytes, 0, bytes, len, otherLen); - - // offsets - if (offsets.length < numCols + other.numCols + 1) { - int[] tmp = new int[numCols + other.numCols + 1]; - System.arraycopy(offsets, 0, tmp, 0, numCols + 1); - offsets = tmp; - } - - // other offsets - for (int i = 1; i < other.numCols + 1; i++) { - offsets[numCols + i] = other.offsets[i] + len; - } - - numCols += other.numCols; - } - } - - /** - * Performs a projection on the tuple. - * The int parameter is interpreted as a bitmap on the columns. - * I.e. a bitmap value of 1 projects to the first column, 2 to the second, 3 to the first two columns, and so on. - * - * @param bitmap the projection bitmap. - */ - public void project(int bitmap) { - int[] lengths = new int[numCols]; - int lenCount = 0; - - if(bytes == null || offsets == null) return; - - // go through the bitmap and find the indexes of the columns to retain - int k = 0; - for (int i = 0; bitmap != 0 && i < numCols; i++, bitmap >>>= 1) { - if ((bitmap & 0x1) != 0) { - int len = offsets[i + 1] - offsets[i]; - lengths[k] = len; - lenCount += len; - offsets[k] = offsets[i]; - k++; - } - } - numCols = k; - - // allocate the new (smaller) array - byte[] tmp = new byte[lenCount]; - lenCount = 0; - - // copy the columns to the beginning and adjust the offsets to the new array - for (int i = 0; i < k; i++) { - System.arraycopy(bytes, offsets[i], tmp, lenCount, lengths[i]); - offsets[i] = lenCount; - lenCount += lengths[i]; - } - - bytes = tmp; - offsets[numCols] = tmp.length; - } - - /** - * Compares a String attribute of this tuple with a String attribute of another tuple. - * The strings are compared lexicographic. - * - * @param other The other tuple. - * @param thisColumn The index of this tuple's String attribute. - * @param otherColumn The index of the other tuple's String attribute. - * @return 1 if this tuple's attribute is greater, 0 if both attributes have the same value, - * -1 if this tuple's attribute is smaller. - * @throws IndexOutOfBoundsException Thrown if one of the column indices is invalid (smaller than 0 or bigger - * than the attribute count). - */ - public int compareStringAttribute(Tuple other, int thisColumn, int otherColumn) { - - if(thisColumn < 0) throw new IndexOutOfBoundsException(); - if(otherColumn < 0) throw new IndexOutOfBoundsException(); - if(thisColumn >= numCols) throw new IndexOutOfBoundsException(); - if(otherColumn >= other.numCols) throw new IndexOutOfBoundsException(); - - int len = getColumnLength(thisColumn); - int otherLen = other.getColumnLength(otherColumn); - int min = Math.min(len, otherLen); - - int startPos = offsets[thisColumn]; - int otherStartPos = other.offsets[otherColumn]; - - for (int i = 0; i < min; i++) { - if (bytes[startPos + i] < other.bytes[otherStartPos + i]) { - return -1; - } else if (bytes[startPos + i] > other.bytes[otherStartPos + i]) { - return 1; - } - } - - if (len < otherLen) { - return -1; - } else if (len > otherLen) { - return 1; - } else { - return 0; - } - } - - /** - * Compares an Integer attribute of this tuple with an Integer attribute of another tuple. - * - * @param other The other tuple. - * @param thisColumn The index of this tuple's int attribute. - * @param otherColumn The index of the other tuple's int attribute. - * @return 1 if this tuple's attribute is greater, 0 if both attributes have the same value, - * -1 if this tuple's attribute is smaller. - * @throws IndexOutOfBoundsException Thrown if one of the column indices is invalid (smaller than 0 or bigger - * than the attribute count). - */ - public int compareIntAttribute(Tuple other, int thisColumn, int otherColumn) { - int len = getColumnLength(thisColumn); - int otherLen = other.getColumnLength(otherColumn); - - if(thisColumn < 0) throw new IndexOutOfBoundsException(); - if(otherColumn < 0) throw new IndexOutOfBoundsException(); - if(thisColumn >= numCols) throw new IndexOutOfBoundsException(); - if(otherColumn >= other.numCols) throw new IndexOutOfBoundsException(); - - short thisNegative = 1; - short otherNegative = 1; - - if(this.bytes[offsets[thisColumn]] == '-') { - thisNegative = -1; - } - - if(other.getBytes()[other.offsets[otherColumn]] == '-') { - otherNegative = -1; - } - - // check one int is negative - if(thisNegative != otherNegative) { - return thisNegative; - } - - // check if they vary in length - if (len < otherLen) { - return -1 * thisNegative; - } else if (len > otherLen) { - return 1 * thisNegative; - } - - // both have the same orientation and length, check digit-wise - int myStartPos = offsets[thisColumn]; - int compStartPos = other.offsets[otherColumn]; - - for (int i = 0; i < len; i++) { - if (bytes[myStartPos + i] < other.bytes[compStartPos + i]) { - return -1 * thisNegative; - } else if (bytes[myStartPos + i] > other.bytes[compStartPos + i]) { - return 1 * thisNegative; - } - } - return 0; - - } - - /** - * Returns the String value of the attribute with the specified index. - * - * @param column The index of the attribute whose String value is returned. - * @return The String value of the specified attribute. - * @throws IndexOutOfBoundsException Thrown if the index of the column is invalid (smaller than 0 or bigger - * than the attribute count). - */ - public String getStringValueAt(int column) throws IndexOutOfBoundsException { - // check for validity of column index - if(column < 0) throw new IndexOutOfBoundsException(); - if(column >= numCols) throw new IndexOutOfBoundsException(); - - int off = offsets[column]; - int len = getColumnLength(column); - - char[] chars = new char[len]; - for (int i = 0; i < len; i++) { - chars[i] = (char) (bytes[off + i] & 0xff); - } - - return new String(chars); - } - - /** - * Returns the Long value of the attribute with the specified index. - * The value must be represented in the decimal system. - * - * @param column The index of the attribute whose value is returned as long. - * @return The long value of the specified attribute. - * @throws IndexOutOfBoundsException Thrown if the index of the column is invalid (smaller than 0 or bigger - * than the attribute count). - * @throws NumberFormatException Thrown if the attribute is not a valid long value - * (contains any other character than digits or '-'.) - */ - public long getLongValueAt(int column) throws IndexOutOfBoundsException, NumberFormatException { - - if(column < 0) throw new IndexOutOfBoundsException(); - if(column >= numCols) throw new IndexOutOfBoundsException(); - - int off = offsets[column]; - int len = getColumnLength(column); - - boolean isNegative = false; - - if(bytes[off] == '-') { - isNegative = true; - off++; - len--; - } - - long value = 0; - for (int i = off; i < off + len; i++) { - - if(bytes[i] < '0' || bytes[i] > '9') throw new NumberFormatException(); - - value *= 10; - value += (bytes[i] - 48); - } - - if(isNegative) { - value *= -1; - } - - return value; - } - - /** - * Returns an attribute which is specified by an index as byte array. - * - * @param column The index of the attribute which is returned as byte array. - * @return The value of the specified attribute as byte array value. - * @throws IndexOutOfBoundsException Thrown if the index of the column is invalid (smaller than 0 or bigger - * than the attribute count). - */ - public byte[] getByteArrayValueAt(int column) throws IndexOutOfBoundsException { - - if(column < 0) throw new IndexOutOfBoundsException(); - if(column >= numCols) throw new IndexOutOfBoundsException(); - - int len = getColumnLength(column); - byte[] buffer = new byte[len]; - System.arraycopy(bytes, offsets[column], buffer, 0, len); - return buffer; - } - - /** - * Sets the size of the internal byte array of the tuple to the minimum capacity. - * If the minimum capacity is smaller than the current size of the tuple's byte array, - * nothing is done. Otherwise a new byte array is allocated and the content of the old one copied. - * - * @param minCapacity The new size of the internal byte array. - */ - public void reserveSpace(int minCapacity) { - if (bytes.length < minCapacity) { - byte[] tmp = new byte[minCapacity]; - System.arraycopy(bytes, 0, tmp, 0, offsets[numCols]); - bytes = tmp; - } - } - - /** - * Reduces the size of the internal byte and offset arrays to the currently used size. - */ - public void compact() { - int len = offsets[numCols]; - - if (bytes.length > len) { - byte[] tmp = new byte[len]; - System.arraycopy(bytes, 0, tmp, 0, len); - bytes = tmp; - } - - if (offsets.length > numCols + 1) { - int[] tmp = new int[numCols + 1]; - System.arraycopy(offsets, 0, tmp, 0, numCols + 1); - offsets = tmp; - } - } - - /** - * Appends an attribute at the end of the tuple. - * - * @param attValue The attribute to append. - */ - public void addAttribute(byte[] attValue) { - int end; - - if (numCols == 0) { - offsets = new int[5]; - bytes = new byte[Math.max(256, attValue.length + 1)]; - end = 0; - } else { - end = offsets[numCols]; - - // increase offset array, if necessary - if (numCols + 1 >= offsets.length) { - int[] tmp = new int[offsets.length * 2]; - System.arraycopy(offsets, 0, tmp, 0, numCols + 1); - offsets = tmp; - } - - // increase byte buffer, if necessary - if (bytes.length < end + attValue.length + 1) { - byte[] tmp = new byte[bytes.length + attValue.length + 1]; - System.arraycopy(bytes, 0, tmp, 0, end); - bytes = tmp; - } - } - - // copy bytes, offsets and increase columns - System.arraycopy(attValue, 0, bytes, end, attValue.length); - end += attValue.length; - bytes[end++] = '|'; - numCols++; - offsets[numCols] = end; - } - - /** - * Appends an attribute at the end of the tuple. - * - * @param attValue The attribute to append. - */ - public void addAttribute(String attValue) { - int end; - - if (numCols == 0) { - offsets = new int[5]; - bytes = new byte[Math.max(256, attValue.length() + 1)]; - end = 0; - } else { - end = offsets[numCols]; - - // increase offset array, if necessary - if (numCols + 1 >= offsets.length) { - int[] tmp = new int[offsets.length * 2]; - System.arraycopy(offsets, 0, tmp, 0, numCols + 1); - offsets = tmp; - } - - // increase byte buffer, if necessary - if (bytes.length < end + attValue.length() + 1) { - byte[] tmp = new byte[bytes.length + attValue.length() + 1]; - System.arraycopy(bytes, 0, tmp, 0, end); - bytes = tmp; - } - } - - // copy bytes, offsets and increase columns - for (int i = 0; i < attValue.length(); i++, end++) { - bytes[end] = (byte) (attValue.charAt(i) & 0xff); - } - bytes[end++] = '|'; - numCols++; - offsets[numCols] = end; - } - - /** - * Appends an attribute by copying it from another tuple. - * - * @param other The other tuple to copy from. - * @param column The index of the attribute to copy within the other tuple. - */ - public void addAttributeFromKVRecord(Tuple other, int column) { - int len = other.getColumnLength(column) + 1; - int end; - - if (numCols == 0) { - offsets = new int[5]; - bytes = new byte[Math.max(256, len)]; - end = 0; - } else { - end = offsets[numCols]; - - // increase offset array, if necessary - if (numCols + 1 >= offsets.length) { - int[] tmp = new int[offsets.length * 2]; - System.arraycopy(offsets, 0, tmp, 0, numCols + 1); - offsets = tmp; - } - - // increase byte buffer, if necessary - if (bytes.length < end + len) { - byte[] tmp = new byte[end + len]; - System.arraycopy(bytes, 0, tmp, 0, end); - bytes = tmp; - } - } - - System.arraycopy(other.bytes, other.offsets[column], bytes, end, len); - numCols++; - offsets[numCols] = end + len; - } - - public void setContents(byte[] bytes, int offset, int len, char delimiter) - { - // make space - if (this.bytes == null || this.bytes.length < len) { - this.bytes = new byte[len]; - } - - // copy the binary data - System.arraycopy(bytes, offset, this.bytes, 0, len); - - int readPos = offset; - - // allocate the offsets array - if (this.offsets == null) { - this.offsets = new int[4]; - } - - int col = 1; // the column we are in - - int startPos = readPos; - - while (readPos < offset + len) { - if (bytes[readPos++] == delimiter) { - if (offsets.length <= col) { - int newOffsets[] = new int[this.offsets.length * 2]; - System.arraycopy(this.offsets, 0, newOffsets, 0, this.offsets.length); - this.offsets = newOffsets; - } - this.offsets[col++] = readPos - startPos; - } - } - - this.numCols = col - 1; - } - - - // ------------------------------------------------------------------------ - // Serialization - // ------------------------------------------------------------------------ - - @Override - public void read(DataInputView in) throws IOException { - // read the bytes - int numBytes = in.readInt(); - if (numBytes > 0) { - bytes = new byte[numBytes]; - in.readFully(bytes); - - // read the offsets - numCols = in.readInt() + 1; - offsets = new int[numCols + 1]; - for (int i = 1; i < numCols; i++) { - offsets[i] = in.readInt(); - } - // set last offset - offsets[numCols] = numBytes; - } else { - numCols = 0; - } - } - - @Override - public void write(DataOutputView out) throws IOException { - // write the bytes - int numBytes = (numCols > 0 ? offsets[numCols] : 0); - out.writeInt(numBytes); - if (numBytes > 0) { - out.write(bytes, 0, numBytes); - - // write the offsets - // exclude first and last - out.writeInt(numCols - 1); - for (int i = 1; i < numCols; i++) { - out.writeInt(offsets[i]); - } - } - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - - for (int i = 0; i < numCols; i++) { - for (int k = 0; k < getColumnLength(i); k++) { - bld.append((char) (bytes[offsets[i] + k] & 0xff)); - } - bld.append('|'); - } - - return bld.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java deleted file mode 100644 index 5e880cb..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/util/UniformIntInput.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.recordJobs.util; - -import org.apache.flink.api.java.record.io.GenericInputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -/** - * - */ -public class UniformIntInput extends GenericInputFormat { - private static final long serialVersionUID = 1L; - - public static final String NUM_KEYS_KEY = "testfomat.numkeys"; - public static final String NUM_VALUES_KEY = "testfomat.numvalues"; - - private static final int DEFAULT_NUM_KEYS = 1000; - private static final int DEFAULT_NUM_VALUES = 1000; - - private final IntValue key = new IntValue(); - private final IntValue value = new IntValue(); - - private int numKeys; - private int numValues; - - private int keyInt; - private int valueInt; - - public UniformIntInput() { - this(DEFAULT_NUM_KEYS, DEFAULT_NUM_VALUES); - } - - public UniformIntInput(final int numKeys, final int numValues) { - this.numKeys = numKeys; - this.numValues = numValues; - } - - - - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - - this.numKeys = parameters.getInteger(NUM_KEYS_KEY, this.numKeys); - this.numValues = parameters.getInteger(NUM_VALUES_KEY, this.numValues); - } - - - @Override - public boolean reachedEnd() { - return this.valueInt >= this.numValues; - } - - - @Override - public Record nextRecord(Record record) { - if (this.keyInt == this.numKeys) { - this.keyInt = 0; - this.valueInt++; - } - - this.key.setValue(this.keyInt); - this.value.setValue(this.valueInt); - - record.setField(0, this.key); - record.setField(1, this.value); - record.updateBinaryRepresenation(); - - this.keyInt++; - - return record; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java deleted file mode 100644 index 2fcc523..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.wordcount; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.io.TextInputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.client.LocalExecutor; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; - -import java.util.Iterator; -import java.util.StringTokenizer; -import java.util.concurrent.TimeUnit; - -/** - * Implements a word count which takes the input file and counts the number of - * the occurrences of each word in the file. - * - * <br><br> - * - * <b>Note</b>: This example uses the out-dated Record API. - * It is recommended to use the new Java API. - * - */ -@SuppressWarnings("deprecation") -public class WordCount implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - - /** - * Converts a Record containing one string in to multiple string/integer pairs. - * The string is tokenized by whitespaces. For each token a new record is emitted, - * where the token is the first field and an Integer(1) is the second field. - */ - public static class TokenizeLine extends MapFunction { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> collector) { - // get the first field (as type StringValue) from the record - String line = record.getField(0, StringValue.class).getValue(); - - // normalize the line - line = line.replaceAll("\\W+", " ").toLowerCase(); - - // tokenize the line - StringTokenizer tokenizer = new StringTokenizer(line); - while (tokenizer.hasMoreTokens()) { - String word = tokenizer.nextToken(); - - // we emit a (word, 1) pair - collector.collect(new Record(new StringValue(word), new IntValue(1))); - } - } - } - - /** - * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code> - * in the record. The other fields are not modified. - */ - @Combinable - @ConstantFields(0) - public static class CountWords extends ReduceFunction { - - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - Record element = null; - int sum = 0; - while (records.hasNext()) { - element = records.next(); - int cnt = element.getField(1, IntValue.class).getValue(); - sum += cnt; - } - - element.setField(1, new IntValue(sum)); - out.collect(element); - } - - @Override - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - // the logic is the same as in the reduce function, so simply call the reduce method - reduce(records, out); - } - } - - - @Override - public Plan getPlan(String... args) { - // parse job parameters - int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String dataInput = (args.length > 1 ? args[1] : ""); - String output = (args.length > 2 ? args[2] : ""); - - FileDataSource source = new FileDataSource(new TextInputFormat(), dataInput, "Input Lines"); - MapOperator mapper = MapOperator.builder(new TokenizeLine()) - .input(source) - .name("Tokenize Lines") - .build(); - ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0) - .input(mapper) - .name("Count Words") - .build(); - - @SuppressWarnings("unchecked") - FileDataSink out = new FileDataSink(new CsvOutputFormat("\n", " ", StringValue.class, IntValue.class), output, reducer, "Word Counts"); - - Plan plan = new Plan(out, "WordCount Example"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } - - @Override - public String getDescription() { - return "Parameters: <numSubStasks> <input> <output>"; - } - - - public static void main(String[] args) throws Exception { - WordCount wc = new WordCount(); - - if (args.length < 3) { - System.err.println(wc.getDescription()); - System.exit(1); - } - - Plan plan = wc.getPlan(args); - - // This will execute the word-count embedded in a local context. replace this line by the commented - // succeeding line to send the job to a local installation or to a cluster for execution - JobExecutionResult result = LocalExecutor.execute(plan); - System.err.println("Total runtime: " + result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ebba20df/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java deleted file mode 100644 index 780db58..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCountAccumulators.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recordJobs.wordcount; - -import java.io.Serializable; -import java.util.HashSet; -import java.util.Iterator; -import java.util.StringTokenizer; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.Histogram; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.io.TextInputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.client.LocalExecutor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; - -/** - * This is similar to the WordCount example and additionally demonstrates how to - * use custom accumulators (built-in or custom). - */ -@SuppressWarnings("deprecation") -public class WordCountAccumulators implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - public static class TokenizeLine extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - // For efficiency it is recommended to have member variables for the accumulators - public static final String ACCUM_NUM_LINES = "accumulator.num-lines"; - private LongCounter numLines = new LongCounter(); - - // This histogram accumulator collects the distribution of number of words per line - public static final String ACCUM_WORDS_PER_LINE = "accumulator.words-per-line"; - private Histogram wordsPerLine = new Histogram(); - - public static final String ACCUM_DISTINCT_WORDS = "accumulator.distinct-words"; - private SetAccumulator<StringValue> distinctWords = new SetAccumulator<StringValue>(); - - - @Override - public void open(Configuration parameters) throws Exception { - - // Accumulators have to be registered to the system - getRuntimeContext().addAccumulator(ACCUM_NUM_LINES, this.numLines); - getRuntimeContext().addAccumulator(ACCUM_WORDS_PER_LINE, this.wordsPerLine); - getRuntimeContext().addAccumulator(ACCUM_DISTINCT_WORDS, this.distinctWords); - - // You could also write to accumulators in open() or close() - } - - @Override - public void map(Record record, Collector<Record> collector) { - - // Increment counter - numLines.add(1L); - - // get the first field (as type StringValue) from the record - String line = record.getField(0, StringValue.class).getValue(); - - // normalize the line - line = line.replaceAll("\\W+", " ").toLowerCase(); - - // tokenize the line - StringTokenizer tokenizer = new StringTokenizer(line); - int numWords = 0; - - while (tokenizer.hasMoreTokens()) { - String word = tokenizer.nextToken(); - - distinctWords.add(new StringValue(word)); - ++numWords; - - // we emit a (word, 1) pair - collector.collect(new Record(new StringValue(word), new IntValue(1))); - } - - // Add a value to the histogram accumulator - this.wordsPerLine.add(numWords); - } - } - - @Combinable - @ConstantFields(0) - public static class CountWords extends ReduceFunction implements Serializable { - - private static final long serialVersionUID = 1L; - - private final IntValue cnt = new IntValue(); - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) { - Record element = null; - int sum = 0; - while (records.hasNext()) { - element = records.next(); - IntValue i = element.getField(1, IntValue.class); - sum += i.getValue(); - } - - this.cnt.setValue(sum); - element.setField(1, this.cnt); - out.collect(element); - } - } - - @Override - public Plan getPlan(String... args) { - int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String dataInput = (args.length > 1 ? args[1] : ""); - String output = (args.length > 2 ? args[2] : ""); - - FileDataSource source = new FileDataSource(new TextInputFormat(), dataInput, "Input Lines"); - - MapOperator mapper = MapOperator.builder(new TokenizeLine()).input(source).name("Tokenize Lines").build(); - - ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0).input(mapper) - .name("Count Words").build(); - - FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts"); - - CsvOutputFormat.configureRecordFormat(out).recordDelimiter('\n') - .fieldDelimiter(' ').field(StringValue.class, 0) - .field(IntValue.class, 1); - - Plan plan = new Plan(out, "WordCount Example"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } - - @Override - public String getDescription() { - return "Parameters: [numSubStasks] [input] [output]"; - } - - public static void main(String[] args) throws Exception { - WordCountAccumulators wc = new WordCountAccumulators(); - - if (args.length < 3) { - System.err.println(wc.getDescription()); - System.exit(1); - } - - Plan plan = wc.getPlan(args); - - JobExecutionResult result = LocalExecutor.execute(plan); - - // Accumulators can be accessed by their name. - System.out.println("Number of lines counter: "+ result.getAccumulatorResult(TokenizeLine.ACCUM_NUM_LINES)); - System.out.println("Words per line histogram: " + result.getAccumulatorResult(TokenizeLine.ACCUM_WORDS_PER_LINE)); - System.out.println("Distinct words: " + result.getAccumulatorResult(TokenizeLine.ACCUM_DISTINCT_WORDS)); - } - - /** - * Custom accumulator - */ - public static class SetAccumulator<T extends Value> implements Accumulator<T, HashSet<T>> { - - private static final long serialVersionUID = 1L; - - private HashSet<T> set = new HashSet<T>(); - - @Override - public void add(T value) { - this.set.add(value); - } - - @Override - public HashSet<T> getLocalValue() { - return this.set; - } - - @Override - public void resetLocal() { - this.set.clear(); - } - - @Override - public void merge(Accumulator<T, HashSet<T>> other) { - // build union - this.set.addAll(other.getLocalValue()); - } - - @Override - public Accumulator<T, HashSet<T>> clone() { - SetAccumulator<T> result = new SetAccumulator<T>(); - result.set.addAll(set); - return result; - } - } -}