[ https://issues.apache.org/jira/browse/DRILL-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17649870#comment-17649870 ]
ASF GitHub Bot commented on DRILL-8179: --------------------------------------- cgivre commented on code in PR #2725: URL: https://github.com/apache/drill/pull/2725#discussion_r1053498604 ########## contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.ltsv; + +import com.github.lolo.ltsv.LtsvParser; +import com.github.lolo.ltsv.LtsvParser.Builder; +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip; +import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.Iterator; +import java.util.Map; + +public class LTSVBatchReader implements ManagedReader { + + private static final Logger logger = LoggerFactory.getLogger(LTSVBatchReader.class); + private final LTSVFormatPluginConfig config; + private final FileDescrip file; + private final CustomErrorContext errorContext; + private final LtsvParser ltsvParser; + private final RowSetLoader rowWriter; + private final FileSchemaNegotiator negotiator; + private InputStream fsStream; + private Iterator<Map<String, String>> rowIterator; + + + public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator negotiator) { + this.config = config; + this.negotiator = negotiator; + file = negotiator.file(); + errorContext = negotiator.parentErrorContext(); + ltsvParser = buildParser(); + + openFile(); + + // If there is a provided schema, import it + if (negotiator.providedSchema() != null) { + TupleMetadata schema = negotiator.providedSchema(); + negotiator.tableSchema(schema, false); + } + ResultSetLoader loader = negotiator.build(); + rowWriter = loader.writer(); + + } + + private void openFile() { + try { + fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath()); + } catch (IOException e) { + throw UserException + .dataReadError(e) + .message("Unable to open LTSV File %s", file.split().getPath() + " " + e.getMessage()) + .addContext(errorContext) + .build(logger); + } + rowIterator = ltsvParser.parse(fsStream); + } + + @Override + public boolean next() { + while (!rowWriter.isFull()) { + if (!processNextRow()) { + return false; + } + } + return true; + } + + private LtsvParser buildParser() { + Builder builder = LtsvParser.builder(); + builder.trimKeys(); + builder.trimValues(); + builder.skipNullValues(); + + if (config.getParseMode().contentEquals("strict")) { + builder.strict(); + } else { + builder.lenient(); + } + + if (StringUtils.isNotEmpty(config.getEscapeCharacter())) { + builder.withEscapeChar(config.getEscapeCharacter().charAt(0)); + } + + if (StringUtils.isNotEmpty(config.getKvDelimiter())) { + builder.withKvDelimiter(config.getKvDelimiter().charAt(0)); + } + + if (StringUtils.isNotEmpty(config.getEntryDelimiter())) { + builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0)); + } + + if (StringUtils.isNotEmpty(config.getLineEnding())) { + builder.withLineEnding(config.getLineEnding().charAt(0)); + } + + if (StringUtils.isNotEmpty(config.getQuoteChar())) { + builder.withQuoteChar(config.getQuoteChar().charAt(0)); + } + + return builder.build(); + } + + private boolean processNextRow() { + if (!rowIterator.hasNext()) { + return false; + } + // Start the row + String key; + String value; + int columnIndex; + ScalarWriter columnWriter; + Map<String, String> row = rowIterator.next(); + + // Skip empty lines + if (row.isEmpty()) { + return true; + } + rowWriter.start(); + for (Map.Entry<String,String> field: row.entrySet()) { + key = field.getKey(); + value = field.getValue(); + columnIndex = getColumnIndex(key); + columnWriter = getColumnWriter(key); + + + if (negotiator.providedSchema() != null) { + // Check the type. LTSV will only read other data types if a schema is provided. + ColumnMetadata columnMetadata = rowWriter.tupleSchema().metadata(columnIndex); + MinorType dataType = columnMetadata.type(); + LocalTime localTime; + LocalDate localDate; + + switch (dataType) { + case BIT: + columnWriter.setBoolean(Boolean.parseBoolean(value)); + break; + case INT: + case SMALLINT: + case TINYINT: + columnWriter.setInt(Integer.parseInt(value)); + break; + case BIGINT: + columnWriter.setLong(Long.parseLong(value)); + break; + case FLOAT8: + case FLOAT4: + columnWriter.setDouble(Double.parseDouble(value)); + break; + case TIME: + columnMetadata = rowWriter.tupleSchema().metadata(key); + String dateFormat = columnMetadata.property("drill.format"); + + if (Strings.isNullOrEmpty(dateFormat)) { + localTime = LocalTime.parse(value); + } else { + localTime = LocalTime.parse(value, DateTimeFormatter.ofPattern(dateFormat)); + } + columnWriter.setTime(localTime); + break; + case DATE: + dateFormat = columnMetadata.property("drill.format"); + + if (Strings.isNullOrEmpty(dateFormat)) { + localDate = LocalDate.parse(value); + } else { + localDate = LocalDate.parse(value, DateTimeFormatter.ofPattern(dateFormat)); + } + columnWriter.setDate(localDate); + break; + case TIMESTAMP: + dateFormat = columnMetadata.property("drill.format"); + Instant timestamp; + if (Strings.isNullOrEmpty(dateFormat)) { + timestamp = Instant.parse(value); + } else { + try { + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat); + Date parsedDate = simpleDateFormat.parse(value); + timestamp = Instant.ofEpochMilli(parsedDate.getTime()); + } catch (ParseException e) { + throw UserException.parseError(e) + .message("Cannot parse " + value + " as a timestamp. You can specify a format string in the provided schema to correct this.") + .addContext(errorContext) + .build(logger); + } + } + columnWriter.setTimestamp(timestamp); + break; + default: + columnWriter.setString(value); + } + } else { + columnWriter.setString(value); + } + } + // Finish the row + rowWriter.save(); + return true; + } + + @Override + public void close() { + AutoCloseables.closeSilently(fsStream); Review Comment: Good idea. Done. ########## contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java: ########## @@ -37,34 +42,77 @@ public static void setup() throws Exception { @Test public void testWildcard() throws Exception { - testBuilder() - .sqlQuery("SELECT * FROM cp.`simple.ltsv`") - .unOrdered() - .baselineColumns("host", "forwardedfor", "req", "status", "size", "referer", "ua", "reqtime", "apptime", "vhost") - .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200", "4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com") - .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", "412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com") - .go(); + String sql = "SELECT * FROM cp.`simple.ltsv`"; Review Comment: Done > Convert LTSV Format Plugin to EVF2 > ---------------------------------- > > Key: DRILL-8179 > URL: https://issues.apache.org/jira/browse/DRILL-8179 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.20.1 > Reporter: Jingchuan Hu > Assignee: Charles Givre > Priority: Major > Fix For: 2.0.0 > > > Get authorized by Charles, continue the conversion from LTSV to EVF2 directly. -- This message was sent by Atlassian Jira (v8.20.10#820010)