Repository: flink Updated Branches: refs/heads/master 8803304f4 -> 970b2b7af
[runtime] Remove obsolete line reader test classes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ffecaca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ffecaca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ffecaca Branch: refs/heads/master Commit: 7ffecaca0803d3acfaebaa620833398b8861941b Parents: 8803304 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 5 14:50:12 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 5 18:49:56 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/runtime/fs/LineReader.java | 165 ------------------- .../apache/flink/runtime/fs/LineReaderTest.java | 81 --------- 2 files changed, 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ffecaca/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReader.java deleted file mode 100644 index 36b924e..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReader.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. - */ - -package org.apache.flink.runtime.fs; - -import java.io.IOException; - -import org.apache.flink.core.fs.FSDataInputStream; - -public class LineReader { - - private static final int CR = '\r'; - - private static final int LF = '\n'; - - private FSDataInputStream stream; - - private byte[] readBuffer; - - private byte[] wrapBuffer; - - private long lengthLeft; - - private int readPos; - - private int limit; - - private boolean overLimit; - - public LineReader(final FSDataInputStream strm, final long start, final long length, final int buffersize) - throws IOException { - this.stream = strm; - this.readBuffer = new byte[buffersize]; - this.wrapBuffer = new byte[256]; - - this.lengthLeft = length; - this.readPos = 0; - this.overLimit = false; - - if (start != 0) { - strm.seek(start); - readLine(); - } else { - fillBuffer(); - } - } - - private final boolean fillBuffer() throws IOException { - - int toRead = this.lengthLeft > this.readBuffer.length ? this.readBuffer.length : (int) this.lengthLeft; - if (this.lengthLeft <= 0) { - toRead = this.readBuffer.length; - this.overLimit = true; - } - - int read = this.stream.read(this.readBuffer, 0, toRead); - - if (read == -1) { - this.stream.close(); - this.stream = null; - return false; - } else { - this.lengthLeft -= read; - this.readPos = 0; - this.limit = read; - return true; - } - - } - - public void close() throws IOException { - this.wrapBuffer = null; - this.readBuffer = null; - if (this.stream != null) { - this.stream.close(); - } - } - - public byte[] readLine() throws IOException { - if (this.stream == null || this.overLimit) { - return null; - } - - int curr = 0; - int countInWrapBuffer = 0; - - while (true) { - if (this.readPos >= this.limit) { - if (!fillBuffer()) { - if (countInWrapBuffer > 0) { - byte[] tmp = new byte[countInWrapBuffer]; - System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); - return tmp; - } else { - return null; - } - } - } - - int startPos = this.readPos; - int count = 0; - - while (this.readPos < this.limit && (curr = this.readBuffer[this.readPos++]) != LF) { - } - - // check why we dropped out - if (curr == LF) { - // line end - count = this.readPos - startPos - 1; - if (this.readPos == 1 && countInWrapBuffer > 0 && this.wrapBuffer[countInWrapBuffer - 1] == CR) { - countInWrapBuffer--; - } else if (this.readPos > startPos + 1 && this.readBuffer[this.readPos - 2] == CR) { - count--; - } - - // copy to byte array - if (countInWrapBuffer > 0) { - byte[] end = new byte[countInWrapBuffer + count]; - System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer); - System.arraycopy(this.readBuffer, 0, end, countInWrapBuffer, count); - return end; - } else { - byte[] end = new byte[count]; - System.arraycopy(this.readBuffer, startPos, end, 0, count); - return end; - } - } else { - count = this.limit - startPos; - - // buffer exhausted - while (this.wrapBuffer.length - countInWrapBuffer < count) { - // reallocate - byte[] tmp = new byte[this.wrapBuffer.length * 2]; - System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); - this.wrapBuffer = tmp; - } - - System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count); - countInWrapBuffer += count; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ffecaca/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java deleted file mode 100644 index 15da0dc..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.fs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.PrintWriter; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.junit.Test; - -/** - * This class tests the functionality of the LineReader class using a local filesystem. - * - */ - -public class LineReaderTest { - /** - * This test tests the LineReader. So far only under usual conditions. - */ - @Test - public void testLineReader() { - final File testfile = new File(CommonTestUtils.getTempDir() + File.separator - + CommonTestUtils.getRandomFilename()); - final Path pathtotestfile = new Path(testfile.toURI().getPath()); - - try { - PrintWriter pw = new PrintWriter(testfile, "UTF8"); - - for (int i = 0; i < 100; i++) { - pw.append("line\n"); - } - pw.close(); - - LocalFileSystem lfs = new LocalFileSystem(); - FSDataInputStream fis = lfs.open(pathtotestfile); - - // first, we test under "usual" conditions - final LineReader lr = new LineReader(fis, 0, testfile.length(), 256); - - byte[] buffer; - int linecount = 0; - while ((buffer = lr.readLine()) != null) { - assertEquals(new String(buffer, "UTF8"), "line"); - linecount++; - } - assertEquals(linecount, 100); - - // the linereader can not handle situations with larger length than the total file... - - } catch (Exception e) { - fail(e.toString()); - e.printStackTrace(); - } finally { - testfile.delete(); - } - - } - -}