DRILL-3149: TextReader should support multibyte line delimiters
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/223507b7 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/223507b7 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/223507b7 Branch: refs/heads/master Commit: 223507b76ff6c2227e667ae4a53f743c92edd295 Parents: f86c4fa Author: Arina Ielchiieva <[email protected]> Authored: Mon Apr 25 19:15:02 2016 +0300 Committer: Parth Chandra <[email protected]> Committed: Sat Jun 18 17:02:59 2016 -0700 ---------------------------------------------------------------------- .../dfs/FormatPluginOptionsDescriptor.java | 5 + .../store/easy/text/compliant/TextInput.java | 105 +++++++++---------- .../text/compliant/TextParsingSettings.java | 3 - .../org/apache/drill/TestSelectWithOption.java | 74 +++++++++++-- 4 files changed, 115 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java index 34a20e8..d3b2d5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringEscapeUtils; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableInstance; @@ -150,6 +151,10 @@ final class FormatPluginOptionsDescriptor { // when null is passed, we leave the default defined in the config class continue; } + if (param instanceof String) { + // normalize Java literals, ex: \t, \n, \r + param = StringEscapeUtils.unescapeJava((String) param); + } TableParamDef paramDef = t.sig.params.get(i); TableParamDef expectedParamDef = this.functionParamsByName.get(paramDef.name); if (expectedParamDef == null || expectedParamDef.type != paramDef.type) { http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java index 513476f..d8b1672 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java @@ -56,9 +56,7 @@ import com.univocity.parsers.common.Format; */ final class TextInput { - private static final byte NULL_BYTE = (byte) '\0'; - private final byte lineSeparator1; - private final byte lineSeparator2; + private final byte[] lineSeparator; private final byte normalizedLineSeparator; private final TextParsingSettings settings; @@ -91,7 +89,7 @@ final class TextInput { * Whether there was a possible partial line separator on the previous * read so we dropped it and it should be appended to next read. */ - private boolean remByte = false; + private int remByte = -1; /** * The current position in the buffer. @@ -107,13 +105,12 @@ final class TextInput { /** * Creates a new instance with the mandatory characters for handling newlines transparently. - * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} - * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. + * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} + * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. */ public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { - byte[] lineSeparator = settings.getNewLineDelimiter(); + this.lineSeparator = settings.getNewLineDelimiter(); byte normalizedLineSeparator = settings.getNormalizedNewLine(); - Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters"); Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); boolean isCompressed = input instanceof CompressionInputStream ; Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); @@ -138,8 +135,6 @@ final class TextInput { this.startPos = startPos; this.endPos = endPos; - this.lineSeparator1 = lineSeparator[0]; - this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE; this.normalizedLineSeparator = normalizedLineSeparator; this.buffer = readBuffer; @@ -196,23 +191,25 @@ final class TextInput { private final void read() throws IOException { if(bufferReadable){ - if(remByte){ - underlyingBuffer.put(lineSeparator1); - remByte = false; + if(remByte != -1){ + for (int i = 0; i <= remByte; i++) { + underlyingBuffer.put(lineSeparator[i]); + } + remByte = -1; } length = inputFS.read(underlyingBuffer); }else{ byte[] b = new byte[underlyingBuffer.capacity()]; - if(remByte){ - b[0] = lineSeparator1; - length = input.read(b, 1, b.length - 1); - remByte = false; + if(remByte != -1){ + int remBytesNum = remByte + 1; + System.arraycopy(lineSeparator, 0, b, 0, remBytesNum); + length = input.read(b, remBytesNum, b.length - remBytesNum); + remByte = -1; }else{ length = input.read(b); } - underlyingBuffer.put(b); } } @@ -251,46 +248,31 @@ final class TextInput { * adjusts so that we can only read to the last character of the first line that crosses * the split boundary. */ - private void updateLengthBasedOnConstraint(){ - // we've run over our alotted data. - final byte lineSeparator1 = this.lineSeparator1; - final byte lineSeparator2 = this.lineSeparator2; - + private void updateLengthBasedOnConstraint() { // find the next line separator: final long max = bStart + length; - for(long m = this.bStart + (endPos - streamPos); m < max; m++){ - if(PlatformDependent.getByte(m) == lineSeparator1){ - // we found a potential line break. - - if(lineSeparator2 == NULL_BYTE){ - // we found a line separator and don't need to consult the next byte. - length = (int)(m - bStart) + 1; // make sure we include line separator otherwise query may fail (DRILL-4317) - endFound = true; - break; - }else{ - // this is a two byte line separator. - - long mPlus = m+1; - if(mPlus < max){ - // we can check next byte and see if the second lineSeparator is correct. - if(lineSeparator2 == PlatformDependent.getByte(mPlus)){ - length = (int)(mPlus - bStart); - endFound = true; - break; - }else{ - // this was a partial line break. - continue; - } - }else{ - // the last character of the read was a remnant byte. We'll hold off on dealing with this byte until the next read. - remByte = true; - length -= 1; - break; + for (long m = this.bStart + (endPos - streamPos); m < max; m++) { + long mTemp = m - 1; + for (int i = 0; i < lineSeparator.length; i++) { + mTemp++; + if (PlatformDependent.getByte(mTemp) == lineSeparator[i]) { + if (mTemp < max) { + continue; + } else { + // remnant bytes + // the last N characters of the read were a remnant bytes. We'll hold off on dealing with these bytes until the next read. + remByte = i; + length -= (i + 1); + return; } - } + break; } + // we found line delimiter + length = (int) (mTemp - bStart); + endFound = true; + break; } } @@ -301,8 +283,6 @@ final class TextInput { * @throws IOException */ public final byte nextChar() throws IOException { - final byte lineSeparator1 = this.lineSeparator1; - final byte lineSeparator2 = this.lineSeparator2; if (length == -1) { throw StreamFinishedPseudoException.INSTANCE; @@ -326,12 +306,20 @@ final class TextInput { bufferPtr++; // monitor for next line. - if (lineSeparator1 == byteChar && (lineSeparator2 == NULL_BYTE || lineSeparator2 == buffer.getByte(bufferPtr - 1))) { - lineCount++; + int bufferPtrTemp = bufferPtr - 1; + if (byteChar == lineSeparator[0]) { + for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) { + if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) { + return byteChar; + } + } - if (lineSeparator2 != NULL_BYTE) { - byteChar = normalizedLineSeparator; + lineCount++; + byteChar = normalizedLineSeparator; + // we don't need to update buffer position if line separator is one byte long + if (lineSeparator.length > 1) { + bufferPtr += (lineSeparator.length - 1); if (bufferPtr >= length) { if (length != -1) { updateBuffer(); @@ -341,6 +329,7 @@ final class TextInput { } } } + return byteChar; } http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java index a366c90..41bb33d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.store.easy.text.compliant; import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; import com.univocity.parsers.common.TextParsingException; public class TextParsingSettings { @@ -51,8 +50,6 @@ public class TextParsingSettings { this.quote = bSafe(config.getQuote(), "quote"); this.quoteEscape = bSafe(config.getEscape(), "escape"); this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8); - Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2, - String.format("Line delimiter must be 1 or 2 bytes in length. The provided delimiter was %d bytes long.", newLineDelimiter.length)); this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter"); this.comment = bSafe(config.getComment(), "comment"); this.skipFirstLine = config.isSkipFirstLine(); http://git-wip-us.apache.org/repos/asf/drill/blob/223507b7/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java index c74480b..111313b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java @@ -25,7 +25,6 @@ import java.io.FileWriter; import java.io.IOException; import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; -import org.junit.Ignore; import org.junit.Test; public class TestSelectWithOption extends BaseTestQuery { @@ -78,19 +77,72 @@ public class TestSelectWithOption extends BaseTestQuery { ); } - @Test @Ignore // It does not look like lineDelimiter is working - public void testTextLineDelimiter() throws Exception { + @Test + public void testTabFieldDelimiter() throws Exception { + String tableName = genCSVTable("testTabFieldDelimiter", + "1\ta", + "2\tb"); + String fieldDelimiter = new String(new char[]{92, 116}); // represents \t + testWithResult(format("select columns from table(%s(type=>'TeXT', fieldDelimiter => '%s'))", tableName, fieldDelimiter), + listOf("1", "a"), + listOf("2", "b")); + } + + @Test + public void testSingleTextLineDelimiter() throws Exception { + String tableName = genCSVTable("testSingleTextLineDelimiter", + "a|b|c"); + + testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName), + listOf("a"), + listOf("b"), + listOf("c")); + } + + @Test + // '\n' is treated as standard delimiter + // if user has indicated custom line delimiter but input file contains '\n', split will occur on both + public void testCustomTextLineDelimiterAndNewLine() throws Exception { String tableName = genCSVTable("testTextLineDelimiter", - "\"b\"|\"0\"", - "\"b\"|\"1\"", - "\"b\"|\"2\""); + "b|1", + "b|2"); testWithResult(format("select columns from table(%s(type => 'TeXT', lineDelimiter => '|'))", tableName), - listOf("\"b\""), - listOf("\"0\"", "\"b\""), - listOf("\"1\"", "\"b\""), - listOf("\"2\"") - ); + listOf("b"), + listOf("1"), + listOf("b"), + listOf("2")); + } + + @Test + public void testTextLineDelimiterWithCarriageReturn() throws Exception { + String tableName = genCSVTable("testTextLineDelimiterWithCarriageReturn", + "1, a\r", + "2, b\r"); + String lineDelimiter = new String(new char[]{92, 114, 92, 110}); // represents \r\n + testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => '%s'))", tableName, lineDelimiter), + listOf("1, a"), + listOf("2, b")); + } + + @Test + public void testMultiByteLineDelimiter() throws Exception { + String tableName = genCSVTable("testMultiByteLineDelimiter", + "1abc2abc3abc"); + testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName), + listOf("1"), + listOf("2"), + listOf("3")); + } + + @Test + public void testDataWithPartOfMultiByteLineDelimiter() throws Exception { + String tableName = genCSVTable("testDataWithPartOfMultiByteLineDelimiter", + "ab1abc2abc3abc"); + testWithResult(format("select columns from table(%s(type=>'TeXT', lineDelimiter => 'abc'))", tableName), + listOf("ab1"), + listOf("2"), + listOf("3")); } @Test
