arina-ielchiieva commented on a change in pull request #1683: DRILL-6952: Host compliant text reader on the row set framework URL: https://github.com/apache/drill/pull/1683#discussion_r264044441
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.hadoop.mapred.FileSplit; + +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; + +/** + * New text reader, complies with the RFC 4180 standard for text/csv files + */ +public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class); + + private static final int MAX_RECORDS_PER_BATCH = 8096; + private static final int READ_BUFFER = 1024*1024; + private static final int WHITE_SPACE_BUFFER = 64*1024; + + // settings to be used while parsing + private final TextParsingSettingsV3 settings; + // Chunk of the file to be read by this reader + private final FileSplit split; + // text reader implementation + private TextReader reader; + // input buffer + private DrillBuf readBuffer; + // working buffer to handle whitespaces + private DrillBuf whitespaceBuffer; + private final DrillFileSystem dfs; + + private RowSetLoader writer; + + public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) { + this.split = split; + this.settings = settings; + this.dfs = dfs; + + // Validate. Otherwise, these problems show up later as a data + // read error which is very confusing. + + if (settings.getNewLineDelimiter().length == 0) { + throw UserException + .validationError() + .message("The text format line delimiter cannot be blank.") + .build(logger); + } + } + + /** + * Performs the initial setup required for the record reader. + * Initializes the input stream, handling of the output record batch + * and the actual reader to be used. + * @param context operator context from which buffer's will be allocated and managed + * @param outputMutator Used to create the schema in the output record batch + * @throws ExecutionSetupException + */ + + @Override + public boolean open(ColumnsSchemaNegotiator schemaNegotiator) { + final OperatorContext context = schemaNegotiator.context(); + + // Note: DO NOT use managed buffers here. They remain in existence + // until the fragment is shut down. The buffers here are large. + // If we scan 1000 files, and allocate 1 MB for each, we end up + // holding onto 1 GB of memory in managed buffers. + // Instead, we allocate the buffers explicitly, and must free + // them. + + readBuffer = context.getAllocator().buffer(READ_BUFFER); + whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER); + + // TODO: Set this based on size of record rather than + // absolute count. + + schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH); + + // setup Output, Input, and Reader + try { + TextOutput output; + + if (settings.isHeaderExtractionEnabled()) { + output = openWithHeaders(schemaNegotiator); + } else { + output = openWithoutHeaders(schemaNegotiator); + } + if (output == null) { + return false; + } + openReader(output); + return true; + } catch (final IOException e) { + throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger); + } + } + + /** + * Extract header and use that to setup a set of VarCharVectors + * + * @param schemaNegotiator + * @return + * @throws IOException + */ + + private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException { + final String [] fieldNames = extractHeader(); + if (fieldNames == null) { + return null; + } + final TupleMetadata schema = new TupleSchema(); + for (final String colName : fieldNames) { + schema.add(MaterializedField.create(colName, + MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REQUIRED) + .build())); + } + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new FieldVarCharOutput(writer); + } + + /** + * Simply use RepeatedVarCharVector + * + * @param schemaNegotiator + * @return + */ + + private TextOutput openWithoutHeaders( + ColumnsSchemaNegotiator schemaNegotiator) { + final TupleMetadata schema = new TupleSchema(); + schema.add(MaterializedField.create(ColumnsArrayManager.COLUMNS_COL, + MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REPEATED) + .build())); + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes()); + } + + /** + * Setup Input using InputStream + * + * @param output Review comment: Same here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
