Repository: tajo Updated Branches: refs/heads/master 3ae44b1d2 -> 72dd29c52
TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile. Closes #1209 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72dd29c5 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72dd29c5 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72dd29c5 Branch: refs/heads/master Commit: 72dd29c520981a3ffaac2150ee7306ca41192893 Parents: 3ae44b1 Author: Hyunsik Choi <[email protected]> Authored: Thu Nov 27 19:46:58 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Nov 27 19:46:58 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/storage/StorageConstants.java | 3 +- .../org/apache/tajo/util/ReflectionUtil.java | 4 +- .../tajo/storage/text/CSVLineDeserializer.java | 96 +++++++++++ .../apache/tajo/storage/text/CSVLineSerDe.java | 45 +++++ .../tajo/storage/text/CSVLineSerializer.java | 68 ++++++++ .../tajo/storage/text/DelimitedTextFile.java | 163 +++++++------------ .../tajo/storage/text/TextLineDeserializer.java | 60 +++++++ .../apache/tajo/storage/text/TextLineSerDe.java | 65 ++++++++ .../tajo/storage/text/TextLineSerializer.java | 45 +++++ 10 files changed, 449 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f2141a6..6f38f65 100644 --- a/CHANGES +++ b/CHANGES @@ -67,6 +67,9 @@ Release 0.9.1 - unreleased TAJO-1188: Fix testcase testTimestampConstructor in TestTimestampDatum. (DaeMyung Kang via hyunsik) + TAJO-1209: Pluggable line (de)serializer for DelimitedTextFile. + (hyunsik) + BUG FIXES http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index 11ac9b7..3065d31 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -32,7 +32,8 @@ public class StorageConstants { public static final String TEXT_DELIMITER = "text.delimiter"; public static final String TEXT_NULL = "text.null"; - public static final String TEXTFILE_SERDE = "textfile.serde"; + public static final String TEXT_SERDE_CLASS = "text.serde.class"; + public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde"; @Deprecated public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter"; http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java index 410815f..eccc61f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java @@ -32,8 +32,8 @@ public class ReflectionUtil { private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new ConcurrentHashMap<Class<?>, Constructor<?>>(); - public static Object newInstance(Class<?> clazz) - throws InstantiationException, IllegalAccessException { + public static Object newInstance(Class<?> clazz) + throws InstantiationException, IllegalAccessException { return clazz.newInstance(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java new file mode 100644 index 0000000..f580da1 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class CSVLineDeserializer extends TextLineDeserializer { + private FieldSplitProcessor processor; + private FieldSerializerDeserializer fieldSerDer; + private ByteBuf nullChars; + + public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); + + if (nullChars != null) { + nullChars.release(); + } + nullChars = TextLineSerDe.getNullChars(meta); + + fieldSerDer = new TextFieldSerializerDeserializer(); + } + + public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException { + int[] projection = targetColumnIndexes; + if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { + return; + } + + final int rowLength = lineBuf.readableBytes(); + int start = 0, fieldLength = 0, end = 0; + + //Projection + int currentTarget = 0; + int currentIndex = 0; + + while (end != -1) { + end = lineBuf.forEachByte(start, rowLength - start, processor); + + if (end < 0) { + fieldLength = rowLength - start; + } else { + fieldLength = end - start; + } + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + lineBuf.setIndex(start, start + fieldLength); + Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); + tuple.put(currentIndex, datum); + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + start = end + 1; + currentIndex++; + } + } + + @Override + public void release() { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java new file mode 100644 index 0000000..e2686a6 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.StorageConstants; + +public class CSVLineSerDe extends TextLineSerDe { + + public CSVLineSerDe() { + } + + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + return new CSVLineDeserializer(schema, meta, targetColumnIndexes); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new CSVLineSerializer(schema, meta); + } + + public static char getFieldDelimiter(TableMeta meta) { + return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java new file mode 100644 index 0000000..684519c --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +public class CSVLineSerializer extends TextLineSerializer { + private FieldSerializerDeserializer serde; + + private byte [] nullChars; + private char delimiter; + + public CSVLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + nullChars = TextLineSerDe.getNullCharsAsBytes(meta); + delimiter = CSVLineSerDe.getFieldDelimiter(meta); + + serde = new TextFieldSerializerDeserializer(); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + int rowBytes = 0; + + for (int i = 0; i < schema.size(); i++) { + Datum datum = input.get(i); + rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); + + if (schema.size() - 1 > i) { + out.write((byte) delimiter); + rowBytes += 1; + } + } + + return rowBytes; + } + + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 68d89e7..d15f394 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -19,8 +19,6 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,19 +33,20 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.util.ReflectionUtil; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class DelimitedTextFile { @@ -56,15 +55,48 @@ public class DelimitedTextFile { private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + /** it caches line serde classes. */ + private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache = + new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>(); + + /** + * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given, + * it will use the specified serder class. + * + * @return TextLineSerder + */ + public static TextLineSerDe getLineSerde(TableMeta meta) { + TextLineSerDe lineSerder; + + String serDeClassName; + + // if there is no given serde class, it will use CSV line serder. + serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS); + + try { + Class<? extends TextLineSerDe> serdeClass; + + if (serdeClassCache.containsKey(serDeClassName)) { + serdeClass = serdeClassCache.get(serDeClassName); + } else { + serdeClass = (Class<? extends TextLineSerDe>) Class.forName(CSVLineSerDe.class.getName()); + serdeClassCache.put(serDeClassName, serdeClass); + } + lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); + } catch (Throwable e) { + throw new RuntimeException("TextLineSerde class cannot be initialized"); + } + + return lineSerder; + } + public static class DelimitedTextFileAppender extends FileAppender { private final TableMeta meta; private final Schema schema; - private final int columnNum; private final FileSystem fs; private FSDataOutputStream fos; private DataOutputStream outputStream; private CompressionOutputStream deflateFilter; - private char delimiter; private TableStatistics stats = null; private Compressor compressor; private CompressionCodecFactory codecFactory; @@ -76,7 +108,7 @@ public class DelimitedTextFile { private long pos = 0; private NonSyncByteArrayOutputStream os; - private FieldSerializerDeserializer serde; + private TextLineSerializer serializer; public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException { @@ -84,17 +116,10 @@ public class DelimitedTextFile { this.fs = path.getFileSystem(conf); this.meta = meta; this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - this.columnNum = schema.size(); - - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } + } + + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); } @Override @@ -133,7 +158,8 @@ public class DelimitedTextFile { this.stats = new TableStatistics(this.schema); } - serde = new TextFieldSerializerDeserializer(); + serializer = getLineSerde().createSerializer(schema, meta); + serializer.init(); if (os == null) { os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); @@ -145,26 +171,20 @@ public class DelimitedTextFile { super.init(); } - @Override public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; + // write + int rowBytes = serializer.serialize(os, tuple); - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars); - - if (columnNum - 1 > i) { - os.write((byte) delimiter); - rowBytes += 1; - } - } + // new line os.write(LF); rowBytes += 1; + // update positions pos += rowBytes; bufferedBytes += rowBytes; + + // refill buffer if necessary if (bufferedBytes > BUFFER_SIZE) { flushBuffer(); } @@ -197,6 +217,8 @@ public class DelimitedTextFile { public void close() throws IOException { try { + serializer.release(); + if(outputStream != null){ flush(); } @@ -241,18 +263,15 @@ public class DelimitedTextFile { } public static class DelimitedTextFileScanner extends FileScanner { - private boolean splittable = false; private final long startOffset; - private final long endOffset; + private final long endOffset; private int recordCount = 0; private int[] targetColumnIndexes; - private ByteBuf nullChars; - private FieldSerializerDeserializer serde; private DelimitedLineReader reader; - private FieldSplitProcessor processor; + private TextLineDeserializer deserializer; public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) @@ -265,30 +284,14 @@ public class DelimitedTextFile { startOffset = fragment.getStartKey(); endOffset = startOffset + fragment.getEndKey(); + } - //Delimiter - String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0)); + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); } @Override public void init() throws IOException { - if (nullChars != null) { - nullChars.release(); - } - - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - byte[] bytes; - if (StringUtils.isEmpty(nullCharacters)) { - bytes = NullDatum.get().asTextBytes(); - } else { - bytes = nullCharacters.getBytes(); - } - - nullChars = BufferPool.directBuffer(bytes.length, bytes.length); - nullChars.writeBytes(bytes); - if (reader != null) { reader.close(); } @@ -305,8 +308,6 @@ public class DelimitedTextFile { targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); } - serde = new TextFieldSerializerDeserializer(); - super.init(); Arrays.sort(targetColumnIndexes); if (LOG.isDebugEnabled()) { @@ -316,6 +317,9 @@ public class DelimitedTextFile { if (startOffset > 0) { reader.readLine(); // skip first line; } + + deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); + deserializer.init(); } public ByteBuf readLine() throws IOException { @@ -362,7 +366,7 @@ public class DelimitedTextFile { } VTuple tuple = new VTuple(schema.size()); - fillTuple(schema, tuple, buf, targetColumnIndexes); + deserializer.deserialize(buf, tuple); return tuple; } catch (Throwable t) { LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t); @@ -370,44 +374,6 @@ public class DelimitedTextFile { } } - private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException { - int[] projection = target; - if (lineBuf == null || target == null || target.length == 0) { - return; - } - - final int rowLength = lineBuf.readableBytes(); - int start = 0, fieldLength = 0, end = 0; - - //Projection - int currentTarget = 0; - int currentIndex = 0; - - while (end != -1) { - end = lineBuf.forEachByte(start, rowLength - start, processor); - - if (end < 0) { - fieldLength = rowLength - start; - } else { - fieldLength = end - start; - } - - if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { - lineBuf.setIndex(start, start + fieldLength); - Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - dst.put(currentIndex, datum); - currentTarget++; - } - - if (projection.length == currentTarget) { - break; - } - - start = end + 1; - currentIndex++; - } - } - @Override public void reset() throws IOException { init(); @@ -416,10 +382,7 @@ public class DelimitedTextFile { @Override public void close() throws IOException { try { - if (nullChars != null) { - nullChars.release(); - nullChars = null; - } + deserializer.release(); if (tableStats != null && reader != null) { tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java new file mode 100644 index 0000000..645d118 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * Reads a text line and fills a Tuple with values + */ +public abstract class TextLineDeserializer { + protected Schema schema; + protected TableMeta meta; + protected int [] targetColumnIndexes; + + public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { + this.schema = schema; + this.meta = meta; + this.targetColumnIndexes = targetColumnIndexes; + } + + /** + * Initialize SerDe + */ + public abstract void init(); + + /** + * It fills a tuple with a read fields in a given line. + * + * @param buf Read line + * @param tuple Tuple to be filled with read fields + * @throws java.io.IOException + */ + public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException; + + /** + * Release external resources + */ + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java new file mode 100644 index 0000000..e81e289 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.StorageConstants; + +/** + * Pluggable Text Line SerDe class + */ +public abstract class TextLineSerDe { + + public TextLineSerDe() { + } + + public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); + + public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); + + public static ByteBuf getNullChars(TableMeta meta) { + byte[] nullCharByteArray = getNullCharsAsBytes(meta); + + ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); + nullChars.writeBytes(nullCharByteArray); + + return nullChars; + } + + public static byte [] getNullCharsAsBytes(TableMeta meta) { + byte [] nullChars; + + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + + return nullChars; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/72dd29c5/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java new file mode 100644 index 0000000..0c2761f --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Write a Tuple into single text formatted line + */ +public abstract class TextLineSerializer { + protected Schema schema; + protected TableMeta meta; + + public TextLineSerializer(Schema schema, TableMeta meta) { + this.schema = schema; + this.meta = meta; + } + + public abstract void init(); + + public abstract int serialize(OutputStream out, Tuple input) throws IOException; + + public abstract void release(); +}
